• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "cloud_syncer.h"
16 
17 #include <cstdint>
18 #include <unordered_map>
19 #include <utility>
20 
21 #include "cloud/asset_operation_utils.h"
22 #include "cloud/cloud_db_constant.h"
23 #include "cloud/cloud_storage_utils.h"
24 #include "cloud/icloud_db.h"
25 #include "cloud_sync_tag_assets.h"
26 #include "cloud_sync_utils.h"
27 #include "db_dfx_adapter.h"
28 #include "db_errno.h"
29 #include "log_print.h"
30 #include "runtime_context.h"
31 #include "storage_proxy.h"
32 #include "store_types.h"
33 #include "strategy_factory.h"
34 #include "version.h"
35 
36 namespace DistributedDB {
CloudSyncer(std::shared_ptr<StorageProxy> storageProxy,bool isKvScene,SingleVerConflictResolvePolicy policy)37 CloudSyncer::CloudSyncer(
38     std::shared_ptr<StorageProxy> storageProxy, bool isKvScene, SingleVerConflictResolvePolicy policy)
39     : lastTaskId_(INVALID_TASK_ID),
40       storageProxy_(std::move(storageProxy)),
41       queuedManualSyncLimit_(DBConstant::QUEUED_SYNC_LIMIT_DEFAULT),
42       closed_(false),
43       hasKvRemoveTask(false),
44       timerId_(0u),
45       isKvScene_(isKvScene),
46       policy_(policy),
47       asyncTaskId_(INVALID_TASK_ID),
48       cancelAsyncTask_(false),
49       scheduleTaskCount_(0),
50       waitDownloadListener_(nullptr)
51 {
52     if (storageProxy_ != nullptr) {
53         id_ = storageProxy_->GetIdentify();
54     }
55     InitCloudSyncStateMachine();
56 }
57 
Sync(const std::vector<DeviceID> & devices,SyncMode mode,const std::vector<std::string> & tables,const SyncProcessCallback & callback,int64_t waitTime)58 int CloudSyncer::Sync(const std::vector<DeviceID> &devices, SyncMode mode,
59     const std::vector<std::string> &tables, const SyncProcessCallback &callback, int64_t waitTime)
60 {
61     CloudTaskInfo taskInfo;
62     taskInfo.mode = mode;
63     taskInfo.table = tables;
64     taskInfo.callback = callback;
65     taskInfo.timeout = waitTime;
66     taskInfo.devices = devices;
67     for (const auto &item: tables) {
68         QuerySyncObject syncObject;
69         syncObject.SetTableName(item);
70         taskInfo.queryList.push_back(syncObject);
71     }
72     return Sync(taskInfo);
73 }
74 
Sync(const CloudTaskInfo & taskInfo)75 int CloudSyncer::Sync(const CloudTaskInfo &taskInfo)
76 {
77     int errCode = CloudSyncUtils::CheckParamValid(taskInfo.devices, taskInfo.mode);
78     if (errCode != E_OK) {
79         return errCode;
80     }
81     if (cloudDB_.IsNotExistCloudDB()) {
82         LOGE("[CloudSyncer] Not set cloudDB!");
83         return -E_CLOUD_ERROR;
84     }
85     if (closed_) {
86         LOGE("[CloudSyncer] DB is closed!");
87         return -E_DB_CLOSED;
88     }
89     if (hasKvRemoveTask) {
90         LOGE("[CloudSyncer] has remove task, stop sync task!");
91         return -E_CLOUD_ERROR;
92     }
93     CloudTaskInfo info = taskInfo;
94     info.status = ProcessStatus::PREPARED;
95     errCode = TryToAddSyncTask(std::move(info));
96     if (errCode != E_OK) {
97         return errCode;
98     }
99     return TriggerSync();
100 }
101 
SetCloudDB(const std::shared_ptr<ICloudDb> & cloudDB)102 void CloudSyncer::SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDB)
103 {
104     cloudDB_.SetCloudDB(cloudDB);
105     LOGI("[CloudSyncer] SetCloudDB finish");
106 }
107 
GetCloudDB() const108 const std::map<std::string, std::shared_ptr<ICloudDb>> CloudSyncer::GetCloudDB() const
109 {
110     return cloudDB_.GetCloudDB();
111 }
112 
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)113 void CloudSyncer::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
114 {
115     storageProxy_->SetIAssetLoader(loader);
116     cloudDB_.SetIAssetLoader(loader);
117     LOGI("[CloudSyncer] SetIAssetLoader finish");
118 }
119 
Close()120 void CloudSyncer::Close()
121 {
122     closed_ = true;
123     CloudSyncer::TaskId currentTask;
124     {
125         std::lock_guard<std::mutex> autoLock(dataLock_);
126         currentTask = currentContext_.currentTaskId;
127     }
128     // mark current task db_closed
129     SetTaskFailed(currentTask, -E_DB_CLOSED);
130     UnlockIfNeed();
131     cloudDB_.Close();
132     WaitCurTaskFinished();
133 
134     // copy all task from queue
135     std::vector<CloudTaskInfo> infoList = CopyAndClearTaskInfos();
136     for (auto &info: infoList) {
137         LOGI("[CloudSyncer] finished taskId %" PRIu64 " with db closed.", info.taskId);
138     }
139     storageProxy_->Close();
140 }
141 
TriggerSync()142 int CloudSyncer::TriggerSync()
143 {
144     if (closed_) {
145         return -E_DB_CLOSED;
146     }
147     RefObject::IncObjRef(this);
148     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
149         DoSyncIfNeed();
150         RefObject::DecObjRef(this);
151     });
152     if (errCode != E_OK) {
153         LOGW("[CloudSyncer] schedule sync task failed %d", errCode);
154         RefObject::DecObjRef(this);
155     }
156     return errCode;
157 }
158 
SetProxyUser(const std::string & user)159 void CloudSyncer::SetProxyUser(const std::string &user)
160 {
161     std::lock_guard<std::mutex> autoLock(dataLock_);
162     if (storageProxy_ != nullptr) {
163         storageProxy_->SetUser(user);
164     }
165     if (currentContext_.notifier != nullptr) {
166         currentContext_.notifier->SetUser(user);
167     }
168     currentContext_.currentUserIndex = currentContext_.currentUserIndex + 1;
169     cloudDB_.SwitchCloudDB(user);
170 }
171 
DoSyncIfNeed()172 void CloudSyncer::DoSyncIfNeed()
173 {
174     if (closed_) {
175         return;
176     }
177     // do all sync task in this loop
178     do {
179         // get taskId from queue
180         TaskId triggerTaskId = GetNextTaskId();
181         if (triggerTaskId == INVALID_TASK_ID) {
182             LOGD("[CloudSyncer] task queue empty");
183             break;
184         }
185         // pop taskId in queue
186         if (PrepareSync(triggerTaskId) != E_OK) {
187             break;
188         }
189         //  if the task is compensated task, we should get the query data and user list.
190         if (IsCompensatedTask(triggerTaskId) && !TryToInitQueryAndUserListForCompensatedSync(triggerTaskId)) {
191             // try to init query fail, we finish the compensated task.
192             continue;
193         }
194         CancelBackgroundDownloadAssetsTaskIfNeed();
195         // do sync logic
196         std::vector<std::string> usersList;
197         {
198             std::lock_guard<std::mutex> autoLock(dataLock_);
199             usersList = cloudTaskInfos_[triggerTaskId].users;
200             currentContext_.currentUserIndex = 0;
201         }
202         int errCode = E_OK;
203         if (usersList.empty()) {
204             SetProxyUser("");
205             errCode = DoSync(triggerTaskId);
206         } else {
207             for (const auto &user : usersList) {
208                 SetProxyUser(user);
209                 errCode = DoSync(triggerTaskId);
210             }
211         }
212         LOGD("[CloudSyncer] DoSync finished, errCode %d", errCode);
213     } while (!closed_);
214     LOGD("[CloudSyncer] DoSyncIfNeed finished, closed status %d", static_cast<int>(closed_));
215 }
216 
DoSync(TaskId taskId)217 int CloudSyncer::DoSync(TaskId taskId)
218 {
219     std::lock_guard<std::mutex> lock(syncMutex_);
220     ResetCurrentTableUploadBatchIndex();
221     CloudTaskInfo taskInfo;
222     {
223         std::lock_guard<std::mutex> autoLock(dataLock_);
224         taskInfo = cloudTaskInfos_[taskId];
225         cloudDB_.SetPrepareTraceId(taskInfo.prepareTraceId); // SetPrepareTraceId before task started
226         LOGD("[CloudSyncer] DoSync get taskInfo, taskId is: %llu, prepareTraceId is:%s.",
227             static_cast<unsigned long long>(taskInfo.taskId), taskInfo.prepareTraceId.c_str());
228     }
229     bool needUpload = true;
230     bool isNeedFirstDownload = false;
231     bool isLockAction = IsLockInDownload();
232     {
233         std::lock_guard<std::mutex> autoLock(dataLock_);
234         if (currentContext_.strategy != nullptr) {
235             needUpload = currentContext_.strategy->JudgeUpload();
236         }
237         // 1. if the locker is already exist, directly reuse the lock, no need do the first download
238         // 2. if the task(resume task) is already be tagged need upload data, no need do the first download
239         isNeedFirstDownload = (currentContext_.locker == nullptr) && (!currentContext_.isNeedUpload) &&
240             isLockAction;
241     }
242     bool isFirstDownload = true;
243     if (isNeedFirstDownload) {
244         // do first download
245         int errCode = DoDownloadInNeed(taskInfo, needUpload, isFirstDownload);
246         SetTaskFailed(taskId, errCode);
247         if (errCode != E_OK) {
248             SyncMachineDoFinished();
249             return errCode;
250         }
251         bool isActuallyNeedUpload = false;  // whether the task actually has data to upload
252         {
253             std::lock_guard<std::mutex> autoLock(dataLock_);
254             isActuallyNeedUpload = currentContext_.isNeedUpload;
255         }
256         if (!isActuallyNeedUpload) {
257             LOGI("[CloudSyncer] no table need upload!");
258             SyncMachineDoFinished();
259             return E_OK;
260         }
261         isFirstDownload = false;
262     }
263 
264     {
265         std::lock_guard<std::mutex> autoLock(dataLock_);
266         currentContext_.isFirstDownload = isFirstDownload;
267         currentContext_.isRealNeedUpload = needUpload;
268     }
269     return DoSyncInner(taskInfo);
270 }
271 
PrepareAndUpload(const CloudTaskInfo & taskInfo,size_t index)272 int CloudSyncer::PrepareAndUpload(const CloudTaskInfo &taskInfo, size_t index)
273 {
274     {
275         std::lock_guard<std::mutex> autoLock(dataLock_);
276         currentContext_.tableName = taskInfo.table[index];
277     }
278     int errCode = CheckTaskIdValid(taskInfo.taskId);
279     if (errCode != E_OK) {
280         LOGE("[CloudSyncer] task is invalid, abort sync");
281         return errCode;
282     }
283     if (taskInfo.table.empty()) {
284         LOGE("[CloudSyncer] Invalid taskInfo table");
285         return -E_INVALID_ARGS;
286     }
287     errCode = DoUpload(taskInfo.taskId, index == (taskInfo.table.size() - 1u), taskInfo.lockAction);
288     if (errCode == -E_CLOUD_VERSION_CONFLICT) {
289         {
290             std::lock_guard<std::mutex> autoLock(dataLock_);
291             currentContext_.processRecorder->MarkDownloadFinish(currentContext_.currentUserIndex,
292                 taskInfo.table[index], false);
293             LOGI("[CloudSyncer] upload version conflict, index:%zu", index);
294         }
295         return errCode;
296     }
297     if (errCode != E_OK) {
298         LOGE("[CloudSyncer] upload failed %d", errCode);
299         return errCode;
300     }
301     return errCode;
302 }
303 
DoUploadInNeed(const CloudTaskInfo & taskInfo,const bool needUpload)304 int CloudSyncer::DoUploadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload)
305 {
306     if (!needUpload || taskInfo.isAssetsOnly) {
307         return E_OK;
308     }
309     storageProxy_->BeforeUploadTransaction();
310     int errCode = CloudSyncUtils::StartTransactionIfNeed(taskInfo, storageProxy_);
311     if (errCode != E_OK) {
312         LOGE("[CloudSyncer] start transaction failed before doing upload.");
313         return errCode;
314     }
315     for (size_t i = 0u; i < taskInfo.table.size(); ++i) {
316         LOGD("[CloudSyncer] try upload table, index: %zu, table name: %s, length: %u",
317             i, DBCommon::StringMiddleMasking(taskInfo.table[i]).c_str(), taskInfo.table[i].length());
318         if (IsTableFinishInUpload(taskInfo.table[i])) {
319             continue;
320         }
321         errCode = PrepareAndUpload(taskInfo, i);
322         if (errCode == -E_TASK_PAUSED) { // should re download [paused table, last table]
323             for (size_t j = i; j < taskInfo.table.size(); ++j) {
324                 MarkDownloadFinishIfNeed(taskInfo.table[j], false);
325             }
326         }
327         if (errCode != E_OK) {
328             break;
329         }
330         MarkUploadFinishIfNeed(taskInfo.table[i]);
331     }
332     if (errCode == -E_TASK_PAUSED) {
333         std::lock_guard<std::mutex> autoLock(dataLock_);
334         resumeTaskInfos_[taskInfo.taskId].upload = true;
335     }
336     CloudSyncUtils::EndTransactionIfNeed(errCode, taskInfo, storageProxy_);
337     return errCode;
338 }
339 
SyncMachineDoDownload()340 CloudSyncEvent CloudSyncer::SyncMachineDoDownload()
341 {
342     CloudTaskInfo taskInfo;
343     bool needUpload;
344     bool isFirstDownload;
345     {
346         std::lock_guard<std::mutex> autoLock(dataLock_);
347         taskInfo = cloudTaskInfos_[currentContext_.currentTaskId];
348         needUpload = currentContext_.isRealNeedUpload;
349         isFirstDownload = currentContext_.isFirstDownload;
350     }
351     int errCode = E_OK;
352     if (IsLockInDownload()) {
353         errCode = LockCloudIfNeed(taskInfo.taskId);
354     }
355     if (errCode != E_OK) {
356         return SetCurrentTaskFailedInMachine(errCode);
357     }
358     errCode = DoDownloadInNeed(taskInfo, needUpload, isFirstDownload);
359     if (errCode != E_OK) {
360         if (errCode == -E_TASK_PAUSED) {
361             DBDfxAdapter::ReportBehavior(
362                 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_DOWNLOAD, StageResult::CANCLE, errCode});
363         } else {
364             DBDfxAdapter::ReportBehavior(
365                 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_DOWNLOAD, StageResult::FAIL, errCode});
366         }
367         return SetCurrentTaskFailedInMachine(errCode);
368     }
369     DBDfxAdapter::ReportBehavior(
370         {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_DOWNLOAD, StageResult::SUCC, errCode});
371     return CloudSyncEvent::DOWNLOAD_FINISHED_EVENT;
372 }
373 
SyncMachineDoUpload()374 CloudSyncEvent CloudSyncer::SyncMachineDoUpload()
375 {
376     CloudTaskInfo taskInfo;
377     bool needUpload;
378     {
379         std::lock_guard<std::mutex> autoLock(dataLock_);
380         taskInfo = cloudTaskInfos_[currentContext_.currentTaskId];
381         needUpload = currentContext_.isRealNeedUpload;
382     }
383     DBDfxAdapter::ReportBehavior(
384         {__func__, Scene::CLOUD_SYNC, State::BEGIN, Stage::CLOUD_UPLOAD, StageResult::SUCC, E_OK});
385     int errCode = DoUploadInNeed(taskInfo, needUpload);
386     if (errCode == -E_CLOUD_VERSION_CONFLICT) {
387         DBDfxAdapter::ReportBehavior(
388             {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_UPLOAD, StageResult::FAIL, errCode});
389         return CloudSyncEvent::REPEAT_CHECK_EVENT;
390     }
391     if (errCode != E_OK) {
392         {
393             std::lock_guard<std::mutex> autoLock(dataLock_);
394             cloudTaskInfos_[currentContext_.currentTaskId].errCode = errCode;
395         }
396         if (errCode == -E_TASK_PAUSED) {
397             DBDfxAdapter::ReportBehavior(
398                 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_UPLOAD, StageResult::CANCLE, errCode});
399         } else {
400             DBDfxAdapter::ReportBehavior(
401                 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_UPLOAD, StageResult::FAIL, errCode});
402         }
403         return CloudSyncEvent::ERROR_EVENT;
404     }
405     DBDfxAdapter::ReportBehavior(
406         {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_UPLOAD, StageResult::SUCC, errCode});
407     return CloudSyncEvent::UPLOAD_FINISHED_EVENT;
408 }
409 
SyncMachineDoFinished()410 CloudSyncEvent CloudSyncer::SyncMachineDoFinished()
411 {
412     UnlockIfNeed();
413     TaskId taskId;
414     int errCode;
415     int currentUserIndex;
416     int userListSize;
417     {
418         std::lock_guard<std::mutex> autoLock(dataLock_);
419         taskId = currentContext_.currentTaskId;
420         errCode = cloudTaskInfos_[currentContext_.currentTaskId].errCode;
421         currentUserIndex = currentContext_.currentUserIndex;
422         userListSize = static_cast<int>(cloudTaskInfos_[taskId].users.size());
423     }
424     if (currentUserIndex >= userListSize) {
425         {
426             std::lock_guard<std::mutex> autoLock(dataLock_);
427             cloudTaskInfos_[currentContext_.currentTaskId].errCode = E_OK;
428         }
429         DoFinished(taskId, errCode);
430     } else {
431         CloudTaskInfo taskInfo;
432         {
433             std::lock_guard<std::mutex> autoLock(dataLock_);
434             taskInfo = cloudTaskInfos_[currentContext_.currentTaskId];
435         }
436         taskInfo.status = ProcessStatus::FINISHED;
437         if (currentContext_.notifier != nullptr) {
438             currentContext_.notifier->NotifyProcess(taskInfo, {}, true);
439         }
440         {
441             std::lock_guard<std::mutex> autoLock(dataLock_);
442             cloudTaskInfos_[currentContext_.currentTaskId].errCode = E_OK;
443         }
444     }
445     return CloudSyncEvent::ALL_TASK_FINISHED_EVENT;
446 }
447 
DoSyncInner(const CloudTaskInfo & taskInfo)448 int CloudSyncer::DoSyncInner(const CloudTaskInfo &taskInfo)
449 {
450     cloudSyncStateMachine_.SwitchStateAndStep(CloudSyncEvent::START_SYNC_EVENT);
451     DBDfxAdapter::ReportBehavior(
452         {__func__, Scene::CLOUD_SYNC, State::BEGIN, Stage::CLOUD_SYNC, StageResult::SUCC, E_OK});
453     return E_OK;
454 }
455 
DoFinished(TaskId taskId,int errCode)456 void CloudSyncer::DoFinished(TaskId taskId, int errCode)
457 {
458     storageProxy_->OnSyncFinish();
459     if (errCode == -E_TASK_PAUSED) {
460         DBDfxAdapter::ReportBehavior(
461             {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_SYNC, StageResult::CANCLE, errCode});
462         LOGD("[CloudSyncer] taskId %" PRIu64 " was paused, it won't be finished now", taskId);
463         {
464             std::lock_guard<std::mutex> autoLock(dataLock_);
465             resumeTaskInfos_[taskId].context = std::move(currentContext_);
466             currentContext_.locker = resumeTaskInfos_[taskId].context.locker;
467             resumeTaskInfos_[taskId].context.locker = nullptr;
468             ClearCurrentContextWithoutLock();
469         }
470         contextCv_.notify_all();
471         return;
472     }
473     {
474         std::lock_guard<std::mutex> autoLock(dataLock_);
475         RemoveTaskFromQueue(cloudTaskInfos_[taskId].priorityLevel, taskId);
476     }
477     ClearContextAndNotify(taskId, errCode);
478 }
479 
480 /**
481  * UpdateChangedData will be used for Insert case, which we can only get rowid after we saved data in db.
482 */
UpdateChangedData(SyncParam & param,DownloadList & assetsDownloadList)483 int CloudSyncer::UpdateChangedData(SyncParam &param, DownloadList &assetsDownloadList)
484 {
485     if (param.withoutRowIdData.insertData.empty() && param.withoutRowIdData.updateData.empty()) {
486         return E_OK;
487     }
488     int ret = E_OK;
489     for (size_t j : param.withoutRowIdData.insertData) {
490         VBucket &datum = param.downloadData.data[j];
491         std::vector<Type> primaryValues;
492         ret = CloudSyncUtils::GetCloudPkVals(datum, param.changedData.field,
493             std::get<int64_t>(datum[DBConstant::ROWID]), primaryValues);
494         if (ret != E_OK) {
495             LOGE("[CloudSyncer] updateChangedData cannot get primaryValues");
496             return ret;
497         }
498         param.changedData.primaryData[ChangeType::OP_INSERT].push_back(primaryValues);
499     }
500     for (const auto &tuple : param.withoutRowIdData.assetInsertData) {
501         size_t downloadIndex = std::get<0>(tuple);
502         VBucket &datum = param.downloadData.data[downloadIndex];
503         size_t insertIdx = std::get<1>(tuple);
504         std::vector<Type> &pkVal = std::get<5>(assetsDownloadList[insertIdx]); // 5 means primary key list
505         pkVal[0] = datum[DBConstant::ROWID];
506     }
507     for (const auto &tuple : param.withoutRowIdData.updateData) {
508         size_t downloadIndex = std::get<0>(tuple);
509         size_t updateIndex = std::get<1>(tuple);
510         VBucket &datum = param.downloadData.data[downloadIndex];
511         size_t size = param.changedData.primaryData[ChangeType::OP_UPDATE].size();
512         if (updateIndex >= size) {
513             LOGE("[CloudSyncer] updateIndex is invalid. index=%zu, size=%zu", updateIndex, size);
514             return -E_INTERNAL_ERROR;
515         }
516         if (param.changedData.primaryData[ChangeType::OP_UPDATE][updateIndex].empty()) {
517             LOGE("[CloudSyncer] primary key value list should not be empty.");
518             return -E_INTERNAL_ERROR;
519         }
520         // no primary key or composite primary key, the first element is rowid
521         param.changedData.primaryData[ChangeType::OP_UPDATE][updateIndex][0] =
522             datum[DBConstant::ROWID];
523     }
524     return ret;
525 }
526 
IsDataContainDuplicateAsset(const std::vector<Field> & assetFields,VBucket & data)527 bool CloudSyncer::IsDataContainDuplicateAsset(const std::vector<Field> &assetFields, VBucket &data)
528 {
529     for (const auto &assetField : assetFields) {
530         if (data.find(assetField.colName) == data.end()) {
531             return false;
532         }
533         if (assetField.type == TYPE_INDEX<Assets> && data[assetField.colName].index() == TYPE_INDEX<Assets>) {
534             if (CloudStorageUtils::IsAssetsContainDuplicateAsset(std::get<Assets>(data[assetField.colName]))) {
535                 return true;
536             }
537         }
538     }
539     return false;
540 }
541 
IsDataContainAssets()542 bool CloudSyncer::IsDataContainAssets()
543 {
544     std::lock_guard<std::mutex> autoLock(dataLock_);
545     bool hasTable = (currentContext_.assetFields.find(currentContext_.tableName) != currentContext_.assetFields.end());
546     if (!hasTable) {
547         LOGW("[CloudSyncer] failed to get assetFields, because tableName doesn't exist in currentContext, %d.",
548             -E_INTERNAL_ERROR);
549             return false;
550     }
551     if (currentContext_.assetFields[currentContext_.tableName].empty()) {
552         return false;
553     }
554     return true;
555 }
556 
TagAssetsInSingleRecord(VBucket & coveredData,VBucket & beCoveredData,bool setNormalStatus,bool isForcePullAseets,int & errCode)557 std::map<std::string, Assets> CloudSyncer::TagAssetsInSingleRecord(VBucket &coveredData, VBucket &beCoveredData,
558     bool setNormalStatus, bool isForcePullAseets, int &errCode)
559 {
560     // Define a map to store the result
561     std::map<std::string, Assets> res = {};
562     std::vector<Field> assetFields;
563     {
564         std::lock_guard<std::mutex> autoLock(dataLock_);
565         assetFields = currentContext_.assetFields[currentContext_.tableName];
566     }
567     // For every column contain asset or assets, assetFields are in context
568     TagAssetsInfo tagAssetsInfo = {coveredData, beCoveredData, setNormalStatus, isForcePullAseets};
569     for (const Field &assetField : assetFields) {
570         Assets assets = TagAssetsInSingleCol(tagAssetsInfo, assetField, errCode);
571         if (!assets.empty()) {
572             res[assetField.colName] = assets;
573         }
574         if (errCode != E_OK) {
575             break;
576         }
577     }
578     return res;
579 }
580 
FillCloudAssets(InnerProcessInfo & info,VBucket & normalAssets,VBucket & failedAssets)581 int CloudSyncer::FillCloudAssets(InnerProcessInfo &info, VBucket &normalAssets, VBucket &failedAssets)
582 {
583     int ret = E_OK;
584     if (normalAssets.size() > 1) {
585         if (info.isAsyncDownload) {
586             ret = storageProxy_->FillCloudAssetForAsyncDownload(info.tableName, normalAssets, true);
587         } else {
588             ret = storageProxy_->FillCloudAssetForDownload(info.tableName, normalAssets, true);
589         }
590         if (ret != E_OK) {
591             LOGE("[CloudSyncer]%s download: Can not fill normal assets for download",
592                 info.isAsyncDownload ? "Async" : "Sync");
593             return ret;
594         }
595     }
596     if (failedAssets.size() > 1) {
597         if (info.isAsyncDownload) {
598             ret = storageProxy_->FillCloudAssetForAsyncDownload(info.tableName, failedAssets, false);
599         } else {
600             ret = storageProxy_->FillCloudAssetForDownload(info.tableName, failedAssets, false);
601         }
602         if (ret != E_OK) {
603             LOGE("[CloudSyncer]%s download: Can not fill abnormal assets for download",
604                 info.isAsyncDownload ? "Async" : "Sync");
605             return ret;
606         }
607     }
608     return E_OK;
609 }
610 
HandleDownloadResult(const DownloadItem & downloadItem,InnerProcessInfo & info,DownloadCommitList & commitList,uint32_t & successCount)611 int CloudSyncer::HandleDownloadResult(const DownloadItem &downloadItem, InnerProcessInfo &info,
612     DownloadCommitList &commitList, uint32_t &successCount)
613 {
614     successCount = 0;
615     int errCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
616     if (errCode != E_OK) {
617         LOGE("[CloudSyncer] start transaction Failed before handle download.");
618         return errCode;
619     }
620     errCode = CommitDownloadAssets(downloadItem, info, commitList, successCount);
621     if (errCode != E_OK) {
622         successCount = 0;
623         int ret = E_OK;
624         if (errCode == -E_REMOVE_ASSETS_FAILED) {
625             // remove assets failed no effect to asset status, just commit
626             ret = storageProxy_->Commit();
627         } else {
628             ret = storageProxy_->Rollback();
629         }
630         LOGE("[CloudSyncer] commit download assets failed %d commit/rollback ret %d", errCode, ret);
631         return errCode;
632     }
633     errCode = storageProxy_->Commit();
634     if (errCode != E_OK) {
635         successCount = 0;
636         LOGE("[CloudSyncer] commit failed %d", errCode);
637     }
638     return errCode;
639 }
640 
CloudDbDownloadAssets(TaskId taskId,InnerProcessInfo & info,const DownloadList & downloadList,const std::set<Key> & dupHashKeySet,ChangedData & changedAssets)641 int CloudSyncer::CloudDbDownloadAssets(TaskId taskId, InnerProcessInfo &info, const DownloadList &downloadList,
642     const std::set<Key> &dupHashKeySet, ChangedData &changedAssets)
643 {
644     int downloadStatus = E_OK;
645     {
646         std::lock_guard<std::mutex> autoLock(dataLock_);
647         downloadStatus = resumeTaskInfos_[taskId].downloadStatus;
648         resumeTaskInfos_[taskId].downloadStatus = E_OK;
649     }
650     int errorCode = E_OK;
651     DownloadCommitList commitList;
652     for (size_t i = GetDownloadAssetIndex(taskId); i < downloadList.size(); i++) {
653         errorCode = CheckTaskIdValid(taskId);
654         if (errorCode != E_OK) {
655             std::lock_guard<std::mutex> autoLock(dataLock_);
656             resumeTaskInfos_[taskId].lastDownloadIndex = i;
657             resumeTaskInfos_[taskId].downloadStatus = downloadStatus;
658             break;
659         }
660         DownloadItem downloadItem;
661         GetDownloadItem(downloadList, i, downloadItem);
662         errorCode = DownloadOneAssetRecord(dupHashKeySet, downloadList, downloadItem, info, changedAssets);
663         if (errorCode == -E_NOT_SET) {
664             info.downLoadInfo.failCount += (downloadList.size() - i);
665             info.downLoadInfo.successCount -= (downloadList.size() - i);
666             return errorCode;
667         }
668         if (downloadItem.strategy == OpType::DELETE) {
669             downloadItem.assets = {};
670             downloadItem.gid = "";
671         }
672         // Process result of each asset
673         commitList.push_back(std::make_tuple(downloadItem.gid, std::move(downloadItem.assets), errorCode == E_OK));
674         downloadStatus = downloadStatus == E_OK ? errorCode : downloadStatus;
675         int ret = CommitDownloadResult(downloadItem, info, commitList, errorCode);
676         if (ret != E_OK && ret != -E_REMOVE_ASSETS_FAILED) {
677             return ret;
678         }
679         downloadStatus = downloadStatus == E_OK ? ret : downloadStatus;
680     }
681     LOGD("Download status is %d", downloadStatus);
682     return errorCode == E_OK ? downloadStatus : errorCode;
683 }
684 
DownloadAssets(InnerProcessInfo & info,const std::vector<std::string> & pKColNames,const std::set<Key> & dupHashKeySet,ChangedData & changedAssets)685 int CloudSyncer::DownloadAssets(InnerProcessInfo &info, const std::vector<std::string> &pKColNames,
686     const std::set<Key> &dupHashKeySet, ChangedData &changedAssets)
687 {
688     if (!IsDataContainAssets()) {
689         return E_OK;
690     }
691     // update changed data info
692     if (!CloudSyncUtils::IsChangeDataEmpty(changedAssets)) {
693         // changedData.primaryData should have no member inside
694         return -E_INVALID_ARGS;
695     }
696     changedAssets.tableName = info.tableName;
697     changedAssets.type = ChangedDataType::ASSET;
698     changedAssets.field = pKColNames;
699 
700     // Get AssetDownloadList
701     DownloadList changeList;
702     TaskId taskId;
703     {
704         std::lock_guard<std::mutex> autoLock(dataLock_);
705         changeList = currentContext_.assetDownloadList;
706         taskId = currentContext_.currentTaskId;
707     }
708     // Download data (include deleting) will handle return Code in this situation
709     int ret = E_OK;
710     if (RuntimeContext::GetInstance()->IsBatchDownloadAssets()) {
711         ret = CloudDbBatchDownloadAssets(taskId, changeList, dupHashKeySet, info, changedAssets);
712     } else {
713         ret = CloudDbDownloadAssets(taskId, info, changeList, dupHashKeySet, changedAssets);
714     }
715     if (ret != E_OK) {
716         LOGE("[CloudSyncer] Can not download assets or can not handle download result %d", ret);
717     }
718     return ret;
719 }
720 
TagDownloadAssets(const Key & hashKey,size_t idx,SyncParam & param,const DataInfo & dataInfo,VBucket & localAssetInfo)721 int CloudSyncer::TagDownloadAssets(const Key &hashKey, size_t idx, SyncParam &param, const DataInfo &dataInfo,
722     VBucket &localAssetInfo)
723 {
724     int ret = E_OK;
725     OpType strategy = param.downloadData.opType[idx];
726     switch (strategy) {
727         case OpType::INSERT:
728         case OpType::UPDATE:
729         case OpType::DELETE:
730             ret = HandleTagAssets(hashKey, dataInfo, idx, param, localAssetInfo);
731             break;
732         case OpType::NOT_HANDLE:
733         case OpType::ONLY_UPDATE_GID:
734         case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO: { // means upload need this data
735             (void)TagAssetsInSingleRecord(
736                 localAssetInfo, param.downloadData.data[idx], true, param.isForcePullAseets, ret);
737             for (const auto &[col, value]: localAssetInfo) {
738                 param.downloadData.data[idx].insert_or_assign(col, value);
739             }
740             break;
741         }
742         default:
743             break;
744     }
745     return ret;
746 }
747 
HandleTagAssets(const Key & hashKey,const DataInfo & dataInfo,size_t idx,SyncParam & param,VBucket & localAssetInfo)748 int CloudSyncer::HandleTagAssets(const Key &hashKey, const DataInfo &dataInfo, size_t idx, SyncParam &param,
749     VBucket &localAssetInfo)
750 {
751     Type prefix;
752     std::vector<Type> pkVals;
753     OpType strategy = param.downloadData.opType[idx];
754     bool isDelStrategy = (strategy == OpType::DELETE);
755     int ret = CloudSyncUtils::GetCloudPkVals(isDelStrategy ? dataInfo.localInfo.primaryKeys :
756         param.downloadData.data[idx], param.pkColNames, dataInfo.localInfo.logInfo.dataKey, pkVals);
757     if (ret != E_OK) {
758         LOGE("[CloudSyncer] HandleTagAssets cannot get primary key value list. %d", ret);
759         return ret;
760     }
761     prefix = param.isSinglePrimaryKey ? pkVals[0] : prefix;
762     if (param.isSinglePrimaryKey && prefix.index() == TYPE_INDEX<Nil>) {
763         LOGE("[CloudSyncer] Invalid primary key type in TagStatus, it's Nil.");
764         return -E_INTERNAL_ERROR;
765     }
766     AssetOperationUtils::FilterDeleteAsset(param.downloadData.data[idx]);
767     std::map<std::string, Assets> assetsMap = TagAssetsInSingleRecord(param.downloadData.data[idx], localAssetInfo,
768         false, param.isForcePullAseets, ret);
769     if (ret != E_OK) {
770         LOGE("[CloudSyncer] TagAssetsInSingleRecord report ERROR in download data");
771         return ret;
772     }
773     strategy = CloudSyncUtils::CalOpType(param, idx);
774     if (!param.isSinglePrimaryKey && strategy == OpType::INSERT) {
775         param.withoutRowIdData.assetInsertData.push_back(std::make_tuple(idx, param.assetsDownloadList.size()));
776     }
777     param.assetsDownloadList.push_back(
778         std::make_tuple(dataInfo.cloudLogInfo.cloudGid, prefix, strategy, assetsMap, hashKey,
779         pkVals, dataInfo.cloudLogInfo.timestamp));
780     return ret;
781 }
782 
SaveDatum(SyncParam & param,size_t idx,std::vector<std::pair<Key,size_t>> & deletedList,std::map<std::string,LogInfo> & localLogInfoCache,std::vector<VBucket> & localInfo)783 int CloudSyncer::SaveDatum(SyncParam &param, size_t idx, std::vector<std::pair<Key, size_t>> &deletedList,
784     std::map<std::string, LogInfo> &localLogInfoCache, std::vector<VBucket> &localInfo)
785 {
786     int ret = PreHandleData(param.downloadData.data[idx], param.pkColNames);
787     if (ret != E_OK) {
788         LOGE("[CloudSyncer] Invalid download data:%d", ret);
789         return ret;
790     }
791     CloudSyncUtils::ModifyCloudDataTime(param.downloadData.data[idx]);
792     DataInfo dataInfo;
793     VBucket localAssetInfo;
794     bool isExist = true;
795     ret = GetLocalInfo(idx, param, dataInfo.localInfo, localLogInfoCache, localAssetInfo);
796     if (ret == -E_NOT_FOUND) {
797         isExist = false;
798     } else if (ret != E_OK) {
799         LOGE("[CloudSyncer] Cannot get info by primary key or gid: %d.", ret);
800         return ret;
801     }
802     // Get cloudLogInfo from cloud data
803     dataInfo.cloudLogInfo = CloudSyncUtils::GetCloudLogInfo(param.downloadData.data[idx]);
804     // Tag datum to get opType
805     ret = TagStatus(isExist, param, idx, dataInfo, localAssetInfo);
806     if (ret != E_OK) {
807         LOGE("[CloudSyncer] Cannot tag status: %d.", ret);
808         return ret;
809     }
810     CloudSyncUtils::UpdateLocalCache(param.downloadData.opType[idx], dataInfo.cloudLogInfo, dataInfo.localInfo.logInfo,
811         localLogInfoCache);
812 
813     if (param.isAssetsOnly && param.downloadData.opType[idx] != OpType::LOCKED_NOT_HANDLE) {
814         // if data is lock, not need to save the local info.
815         auto findGid = param.downloadData.data[idx].find(CloudDbConstant::GID_FIELD);
816         if (findGid == param.downloadData.data[idx].end()) {
817             LOGE("[CloudSyncer] data doe's not have gid field when download only asset: %d.", -E_NOT_FOUND);
818             return -E_NOT_FOUND;
819         }
820         localAssetInfo[CloudDbConstant::GID_FIELD] = findGid->second;
821         localAssetInfo[CloudDbConstant::HASH_KEY_FIELD] = dataInfo.localInfo.logInfo.hashKey;
822         localInfo.push_back(localAssetInfo);
823         return ret; // assets only not need to save changed data without assets.
824     }
825 
826     ret = CloudSyncUtils::SaveChangedData(param, idx, dataInfo, deletedList);
827     if (ret != E_OK) {
828         LOGE("[CloudSyncer] Cannot save changed data: %d.", ret);
829     }
830     return ret;
831 }
832 
SaveData(CloudSyncer::TaskId taskId,SyncParam & param)833 int CloudSyncer::SaveData(CloudSyncer::TaskId taskId, SyncParam &param)
834 {
835     if (!CloudSyncUtils::IsChangeDataEmpty(param.changedData)) {
836         LOGE("[CloudSyncer] changedData.primaryData should have no member inside.");
837         return -E_INVALID_ARGS;
838     }
839     // Update download batch Info
840     param.info.downLoadInfo.batchIndex += 1;
841     param.info.downLoadInfo.total += param.downloadData.data.size();
842     param.info.retryInfo.downloadBatchOpCount = 0;
843     int ret = E_OK;
844     param.deletePrimaryKeySet.clear();
845     param.dupHashKeySet.clear();
846     CloudSyncUtils::ClearWithoutData(param);
847     std::vector<std::pair<Key, size_t>> deletedList;
848     // use for record local delete status
849     std::map<std::string, LogInfo> localLogInfoCache;
850     std::vector<VBucket> localInfo;
851     for (size_t i = 0; i < param.downloadData.data.size(); i++) {
852         ret = SaveDatum(param, i, deletedList, localLogInfoCache, localInfo);
853         if (ret != E_OK) {
854             param.info.downLoadInfo.failCount += param.downloadData.data.size();
855             LOGE("[CloudSyncer] Cannot save datum due to error code %d", ret);
856             return ret;
857         }
858     }
859     // Save assetsMap into current context
860     {
861         std::lock_guard<std::mutex> autoLock(dataLock_);
862         currentContext_.assetDownloadList = param.assetsDownloadList;
863     }
864 
865     ret = PutCloudSyncDataOrUpdateStatusForAssetOnly(param, localInfo);
866     if (ret != E_OK) {
867         return ret;
868     }
869 
870     ret = UpdateChangedData(param, currentContext_.assetDownloadList);
871     if (ret != E_OK) {
872         param.info.downLoadInfo.failCount += param.downloadData.data.size();
873         LOGE("[CloudSyncer] Cannot update changed data: %d.", ret);
874         return ret;
875     }
876     // Update downloadInfo
877     param.info.downLoadInfo.successCount += param.downloadData.data.size();
878     // Get latest cloudWaterMark
879     VBucket &lastData = param.downloadData.data.back();
880     if (!IsNeedProcessCloudCursor(taskId) && param.isLastBatch) {
881         // the last batch of cursor in the conditional query is useless
882         param.cloudWaterMark = {};
883     } else {
884         param.cloudWaterMark = std::get<std::string>(lastData[CloudDbConstant::CURSOR_FIELD]);
885     }
886     return param.isAssetsOnly ? E_OK : UpdateFlagForSavedRecord(param);
887 }
888 
PreCheck(CloudSyncer::TaskId & taskId,const TableName & tableName)889 int CloudSyncer::PreCheck(CloudSyncer::TaskId &taskId, const TableName &tableName)
890 {
891     // Check Input and Context Validity
892     int ret = CheckTaskIdValid(taskId);
893     if (ret != E_OK) {
894         return ret;
895     }
896     {
897         std::lock_guard<std::mutex> autoLock(dataLock_);
898         if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
899             LOGE("[CloudSyncer] Cloud Task Info does not exist taskId: , %" PRIu64 ".", taskId);
900             return -E_INVALID_ARGS;
901         }
902     }
903     if (currentContext_.strategy == nullptr) {
904         LOGE("[CloudSyncer] Strategy has not been initialized");
905         return -E_INVALID_ARGS;
906     }
907     ret = storageProxy_->CheckSchema(tableName);
908     if (ret != E_OK) {
909         LOGE("[CloudSyncer] A schema error occurred on the table to be synced, %d", ret);
910         return ret;
911     }
912     return E_OK;
913 }
914 
NeedNotifyChangedData(const ChangedData & changedData)915 bool CloudSyncer::NeedNotifyChangedData(const ChangedData &changedData)
916 {
917     TaskId taskId;
918     {
919         std::lock_guard<std::mutex> autoLock(dataLock_);
920         taskId = currentContext_.currentTaskId;
921     }
922     if (IsModeForcePush(taskId)) {
923         return false;
924     }
925     // when there have no data been changed, it don't need fill back
926     if (changedData.primaryData[OP_INSERT].empty() && changedData.primaryData[OP_UPDATE].empty() &&
927         changedData.primaryData[OP_DELETE].empty()) {
928         return false;
929     }
930     return true;
931 }
932 
NotifyChangedDataInCurrentTask(ChangedData && changedData)933 int CloudSyncer::NotifyChangedDataInCurrentTask(ChangedData &&changedData)
934 {
935     if (!NeedNotifyChangedData(changedData)) {
936         return E_OK;
937     }
938     std::string deviceName;
939     {
940         std::lock_guard<std::mutex> autoLock(dataLock_);
941         std::vector<std::string> devices = currentContext_.notifier->GetDevices();
942         if (devices.empty()) {
943             DBDfxAdapter::ReportBehavior(
944                 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_NOTIFY, StageResult::FAIL, -E_CLOUD_ERROR});
945             LOGE("[CloudSyncer] CurrentContext do not contain device info");
946             return -E_CLOUD_ERROR;
947         }
948         // We use first device name as the target of NotifyChangedData
949         deviceName = devices[0];
950     }
951     return CloudSyncUtils::NotifyChangeData(deviceName, storageProxy_, std::move(changedData));
952 }
953 
NotifyInDownload(CloudSyncer::TaskId taskId,SyncParam & param,bool isFirstDownload)954 void CloudSyncer::NotifyInDownload(CloudSyncer::TaskId taskId, SyncParam &param, bool isFirstDownload)
955 {
956     if (!isFirstDownload && param.downloadData.data.empty()) {
957         // if the second download and there is no download data, do not notify
958         return;
959     }
960     std::lock_guard<std::mutex> autoLock(dataLock_);
961     if (currentContext_.strategy->JudgeUpload()) {
962         currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], param.info);
963     } else {
964         if (param.isLastBatch) {
965             param.info.tableStatus = ProcessStatus::FINISHED;
966         }
967         if (cloudTaskInfos_[taskId].table.back() == param.tableName && param.isLastBatch) {
968             currentContext_.notifier->UpdateProcess(param.info);
969         } else {
970             currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], param.info);
971         }
972     }
973 }
974 
SaveDataInTransaction(CloudSyncer::TaskId taskId,SyncParam & param)975 int CloudSyncer::SaveDataInTransaction(CloudSyncer::TaskId taskId, SyncParam &param)
976 {
977     int ret = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
978     if (ret != E_OK) {
979         LOGE("[CloudSyncer] Cannot start a transaction: %d.", ret);
980         return ret;
981     }
982     uint64_t cursor = DBConstant::INVALID_CURSOR;
983     std::string maskStoreId = DBCommon::StringMiddleMasking(GetStoreIdByTask(taskId));
984     std::string maskTableName = DBCommon::StringMiddleMasking(param.info.tableName);
985     if (storageProxy_->GetCursor(param.info.tableName, cursor) == E_OK) {
986         LOGI("[CloudSyncer] cursor before save data is %llu, db: %s, table: %s, task id: %llu", cursor,
987             maskStoreId.c_str(), maskTableName.c_str(), taskId);
988     }
989     (void)storageProxy_->SetCursorIncFlag(true);
990     if (!IsModeForcePush(taskId)) {
991         param.changedData.tableName = param.info.tableName;
992         param.changedData.field = param.pkColNames;
993         param.changedData.type = ChangedDataType::DATA;
994     }
995     ret = SaveData(taskId, param);
996     (void)storageProxy_->SetCursorIncFlag(false);
997     if (storageProxy_->GetCursor(param.info.tableName, cursor) == E_OK) {
998         LOGI("[CloudSyncer] cursor after save data is %llu, db: %s, table: %s, task id: %llu", cursor,
999              maskStoreId.c_str(), maskTableName.c_str(), taskId);
1000     }
1001     param.insertPk.clear();
1002     if (ret != E_OK) {
1003         LOGE("[CloudSyncer] cannot save data: %d.", ret);
1004         int rollBackErrorCode = storageProxy_->Rollback();
1005         if (rollBackErrorCode != E_OK) {
1006             LOGE("[CloudSyncer] cannot roll back transaction: %d.", rollBackErrorCode);
1007         } else {
1008             LOGI("[CloudSyncer] Roll back transaction success: %d.", ret);
1009         }
1010         return ret;
1011     }
1012     ret = storageProxy_->Commit();
1013     if (ret != E_OK) {
1014         LOGE("[CloudSyncer] Cannot commit a transaction: %d.", ret);
1015     }
1016     return ret;
1017 }
1018 
DoDownloadAssets(SyncParam & param)1019 int CloudSyncer::DoDownloadAssets(SyncParam &param)
1020 {
1021     // Begin downloading assets
1022     ChangedData changedAssets;
1023     int ret = DownloadAssets(param.info, param.pkColNames, param.dupHashKeySet, changedAssets);
1024     bool isSharedTable = false;
1025     int errCode = storageProxy_->IsSharedTable(param.tableName, isSharedTable);
1026     if (errCode != E_OK) {
1027         LOGE("[CloudSyncer] HandleTagAssets cannot judge the table is a shared table. %d", errCode);
1028         return errCode;
1029     }
1030     if (!isSharedTable) {
1031         (void)NotifyChangedDataInCurrentTask(std::move(changedAssets));
1032     }
1033     if (ret != E_OK) {
1034         LOGE("[CloudSyncer] Cannot notify downloadAssets due to error %d", ret);
1035     } else {
1036         param.assetsDownloadList.clear();
1037     }
1038     return ret;
1039 }
1040 
SaveDataNotifyProcess(CloudSyncer::TaskId taskId,SyncParam & param,bool downloadAssetOnly)1041 int CloudSyncer::SaveDataNotifyProcess(CloudSyncer::TaskId taskId, SyncParam &param, bool downloadAssetOnly)
1042 {
1043     InnerProcessInfo tmpInfo = param.info;
1044     int ret = E_OK;
1045     if (!downloadAssetOnly) {
1046         ChangedData changedData;
1047         param.changedData = changedData;
1048         param.downloadData.opType.resize(param.downloadData.data.size());
1049         param.downloadData.existDataKey.resize(param.downloadData.data.size());
1050         param.downloadData.existDataHashKey.resize(param.downloadData.data.size());
1051         ret = SaveDataInTransaction(taskId, param);
1052         if (ret != E_OK) {
1053             return ret;
1054         }
1055         // call OnChange to notify changedData object first time (without Assets)
1056         ret = NotifyChangedDataInCurrentTask(std::move(param.changedData));
1057         if (ret != E_OK) {
1058             LOGE("[CloudSyncer] Cannot notify changed data due to error %d", ret);
1059             return ret;
1060         }
1061     }
1062     ret = DoDownloadAssets(param);
1063     if (ret == -E_TASK_PAUSED) {
1064         param.info = tmpInfo;
1065         if (IsAsyncDownloadAssets(taskId)) {
1066             UpdateCloudWaterMark(taskId, param);
1067             (void)SaveCloudWaterMark(param.tableName, taskId);
1068         }
1069     }
1070     if (ret != E_OK) {
1071         return ret;
1072     }
1073     UpdateCloudWaterMark(taskId, param);
1074     return E_OK;
1075 }
1076 
NotifyInBatchUpload(const UploadParam & uploadParam,const InnerProcessInfo & innerProcessInfo,bool lastBatch)1077 void CloudSyncer::NotifyInBatchUpload(const UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo,
1078     bool lastBatch)
1079 {
1080     CloudTaskInfo taskInfo;
1081     {
1082         std::lock_guard<std::mutex> autoLock(dataLock_);
1083         taskInfo = cloudTaskInfos_[uploadParam.taskId];
1084     }
1085     std::lock_guard<std::mutex> autoLock(dataLock_);
1086     if (uploadParam.lastTable && lastBatch) {
1087         currentContext_.notifier->UpdateProcess(innerProcessInfo);
1088     } else {
1089         currentContext_.notifier->NotifyProcess(taskInfo, innerProcessInfo);
1090     }
1091 }
1092 
DoDownload(CloudSyncer::TaskId taskId,bool isFirstDownload)1093 int CloudSyncer::DoDownload(CloudSyncer::TaskId taskId, bool isFirstDownload)
1094 {
1095     SyncParam param;
1096     int errCode = GetSyncParamForDownload(taskId, param);
1097     if (errCode != E_OK) {
1098         LOGE("[CloudSyncer] get sync param for download failed %d", errCode);
1099         return errCode;
1100     }
1101     for (auto it = param.assetsDownloadList.begin(); it != param.assetsDownloadList.end();) {
1102         if (std::get<CloudSyncUtils::STRATEGY_INDEX>(*it) != OpType::DELETE) {
1103             it = param.assetsDownloadList.erase(it);
1104         } else {
1105             it++;
1106         }
1107     }
1108     (void)storageProxy_->CreateTempSyncTrigger(param.tableName);
1109     errCode = DoDownloadInner(taskId, param, isFirstDownload);
1110     (void)storageProxy_->ClearAllTempSyncTrigger();
1111     if (errCode == -E_TASK_PAUSED) {
1112         // No need to handle ret.
1113         int ret = storageProxy_->GetCloudWaterMark(param.tableName, param.cloudWaterMark);
1114         if (ret != E_OK) {
1115             LOGE("[DoDownload] Cannot get cloud watermark : %d.", ret);
1116         }
1117         std::lock_guard<std::mutex> autoLock(dataLock_);
1118         resumeTaskInfos_[taskId].syncParam = std::move(param);
1119     }
1120     return errCode;
1121 }
1122 
DoDownloadInner(CloudSyncer::TaskId taskId,SyncParam & param,bool isFirstDownload)1123 int CloudSyncer::DoDownloadInner(CloudSyncer::TaskId taskId, SyncParam &param, bool isFirstDownload)
1124 {
1125     // Query data by batch until reaching end and not more data need to be download
1126     int ret = PreCheck(taskId, param.info.tableName);
1127     if (ret != E_OK) {
1128         return ret;
1129     }
1130     do {
1131         ret = DownloadOneBatch(taskId, param, isFirstDownload);
1132         if (ret != E_OK) {
1133             return ret;
1134         }
1135     } while (!param.isLastBatch);
1136     return E_OK;
1137 }
1138 
NotifyInEmptyDownload(CloudSyncer::TaskId taskId,InnerProcessInfo & info)1139 void CloudSyncer::NotifyInEmptyDownload(CloudSyncer::TaskId taskId, InnerProcessInfo &info)
1140 {
1141     std::lock_guard<std::mutex> autoLock(dataLock_);
1142     if (currentContext_.strategy->JudgeUpload()) {
1143         currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], info);
1144     } else {
1145         info.tableStatus = FINISHED;
1146         if (cloudTaskInfos_[taskId].table.back() == info.tableName) {
1147             currentContext_.notifier->UpdateProcess(info);
1148         } else {
1149             currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], info);
1150         }
1151     }
1152 }
1153 
PreCheckUpload(CloudSyncer::TaskId & taskId,const TableName & tableName,Timestamp & localMark)1154 int CloudSyncer::PreCheckUpload(CloudSyncer::TaskId &taskId, const TableName &tableName, Timestamp &localMark)
1155 {
1156     int ret = PreCheck(taskId, tableName);
1157     if (ret != E_OK) {
1158         return ret;
1159     }
1160     {
1161         std::lock_guard<std::mutex> autoLock(dataLock_);
1162         if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1163             LOGE("[CloudSyncer] Cloud Task Info does not exist taskId: %" PRIu64 ".", taskId);
1164             return -E_INVALID_ARGS;
1165         }
1166         if ((cloudTaskInfos_[taskId].mode < SYNC_MODE_CLOUD_MERGE) ||
1167             (cloudTaskInfos_[taskId].mode > SYNC_MODE_CLOUD_FORCE_PUSH)) {
1168             LOGE("[CloudSyncer] Upload failed, invalid sync mode: %d.",
1169                 static_cast<int>(cloudTaskInfos_[taskId].mode));
1170             return -E_INVALID_ARGS;
1171         }
1172     }
1173 
1174     return ret;
1175 }
1176 
SaveUploadData(Info & insertInfo,Info & updateInfo,Info & deleteInfo,CloudSyncData & uploadData,InnerProcessInfo & innerProcessInfo)1177 int CloudSyncer::SaveUploadData(Info &insertInfo, Info &updateInfo, Info &deleteInfo, CloudSyncData &uploadData,
1178     InnerProcessInfo &innerProcessInfo)
1179 {
1180     int errCode = E_OK;
1181     if (!uploadData.delData.record.empty() && !uploadData.delData.extend.empty()) {
1182         errCode = BatchDelete(deleteInfo, uploadData, innerProcessInfo);
1183         if (errCode != E_OK) {
1184             return errCode;
1185         }
1186     }
1187 
1188     if (!uploadData.updData.record.empty() && !uploadData.updData.extend.empty()) {
1189         errCode = BatchUpdate(updateInfo, uploadData, innerProcessInfo);
1190         if (errCode != E_OK) {
1191             return errCode;
1192         }
1193     }
1194 
1195     if (!uploadData.insData.record.empty() && !uploadData.insData.extend.empty()) {
1196         errCode = BatchInsert(insertInfo, uploadData, innerProcessInfo);
1197         if (errCode != E_OK) {
1198             return errCode;
1199         }
1200     }
1201 
1202     if (!uploadData.lockData.rowid.empty()) {
1203         errCode = storageProxy_->FillCloudLogAndAsset(OpType::LOCKED_NOT_HANDLE, uploadData);
1204         if (errCode != E_OK) {
1205             LOGE("[CloudSyncer] Failed to fill cloud log and asset after save upload data: %d", errCode);
1206         }
1207     }
1208     return errCode;
1209 }
1210 
DoBatchUpload(CloudSyncData & uploadData,UploadParam & uploadParam,InnerProcessInfo & innerProcessInfo)1211 int CloudSyncer::DoBatchUpload(CloudSyncData &uploadData, UploadParam &uploadParam, InnerProcessInfo &innerProcessInfo)
1212 {
1213     int errCode = storageProxy_->FillCloudLogAndAsset(OpType::SET_UPLOADING, uploadData);
1214     if (errCode != E_OK) {
1215         LOGE("[CloudSyncer] Failed to fill cloud log and asset before save upload data: %d", errCode);
1216         return errCode;
1217     }
1218     Info insertInfo;
1219     Info updateInfo;
1220     Info deleteInfo;
1221     errCode = SaveUploadData(insertInfo, updateInfo, deleteInfo, uploadData, innerProcessInfo);
1222     if (errCode != E_OK) {
1223         return errCode;
1224     }
1225     bool lastBatch = (innerProcessInfo.upLoadInfo.successCount + innerProcessInfo.upLoadInfo.failCount) >=
1226                      innerProcessInfo.upLoadInfo.total;
1227     if (lastBatch) {
1228         innerProcessInfo.upLoadInfo.total =
1229             (innerProcessInfo.upLoadInfo.successCount + innerProcessInfo.upLoadInfo.failCount);
1230         innerProcessInfo.tableStatus = ProcessStatus::FINISHED;
1231     }
1232     // After each batch upload successed, call NotifyProcess
1233     NotifyInBatchUpload(uploadParam, innerProcessInfo, lastBatch);
1234 
1235     // if batch upload successed, update local water mark
1236     // The cloud water mark cannot be updated here, because the cloud api doesn't return cursor here.
1237     errCode = PutWaterMarkAfterBatchUpload(uploadData.tableName, uploadParam);
1238     if (errCode != E_OK) {
1239         LOGE("[CloudSyncer] Failed to set local water mark when doing upload, %d.", errCode);
1240     }
1241     return errCode;
1242 }
1243 
PutWaterMarkAfterBatchUpload(const std::string & tableName,UploadParam & uploadParam)1244 int CloudSyncer::PutWaterMarkAfterBatchUpload(const std::string &tableName, UploadParam &uploadParam)
1245 {
1246     int errCode = E_OK;
1247     storageProxy_->ReleaseUploadRecord(tableName, uploadParam.mode, uploadParam.localMark);
1248     // if we use local cover cloud strategy, it won't update local water mark also.
1249     if (IsModeForcePush(uploadParam.taskId) || (IsPriorityTask(uploadParam.taskId) &&
1250         !IsNeedProcessCloudCursor(uploadParam.taskId))) {
1251         return E_OK;
1252     }
1253     errCode = storageProxy_->PutWaterMarkByMode(tableName, uploadParam.mode, uploadParam.localMark);
1254     if (errCode != E_OK) {
1255         LOGE("[CloudSyncer] Cannot set local water mark while Uploading, %d.", errCode);
1256     }
1257     return errCode;
1258 }
1259 
DoUpload(CloudSyncer::TaskId taskId,bool lastTable,LockAction lockAction)1260 int CloudSyncer::DoUpload(CloudSyncer::TaskId taskId, bool lastTable, LockAction lockAction)
1261 {
1262     std::string tableName;
1263     int ret = GetCurrentTableName(tableName);
1264     if (ret != E_OK) {
1265         LOGE("[CloudSyncer] Invalid table name for syncing: %d", ret);
1266         return ret;
1267     }
1268 
1269     Timestamp localMark = 0u;
1270     ret = PreCheckUpload(taskId, tableName, localMark);
1271     if (ret != E_OK) {
1272         LOGE("[CloudSyncer] Doing upload sync pre check failed, %d.", ret);
1273         return ret;
1274     }
1275     ReloadWaterMarkIfNeed(taskId, localMark);
1276     storageProxy_->OnUploadStart();
1277 
1278     int64_t count = 0;
1279     ret = storageProxy_->GetUploadCount(GetQuerySyncObject(tableName), IsModeForcePush(taskId),
1280         IsCompensatedTask(taskId), IsNeedGetLocalWater(taskId), count);
1281     LOGI("get upload count:%zu", count);
1282     if (ret != E_OK) {
1283         // GetUploadCount will return E_OK when upload count is zero.
1284         LOGE("[CloudSyncer] Failed to get Upload Data Count, %d.", ret);
1285         return ret;
1286     }
1287     if (count == 0) {
1288         UpdateProcessInfoWithoutUpload(taskId, tableName, !lastTable);
1289         return E_OK;
1290     }
1291     UploadParam param;
1292     param.count = count;
1293     param.lastTable = lastTable;
1294     param.taskId = taskId;
1295     param.lockAction = lockAction;
1296     return DoUploadInner(tableName, param);
1297 }
1298 
PreProcessBatchUpload(UploadParam & uploadParam,const InnerProcessInfo & innerProcessInfo,CloudSyncData & uploadData)1299 int CloudSyncer::PreProcessBatchUpload(UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo,
1300     CloudSyncData &uploadData)
1301 {
1302     // Precheck and calculate local water mark which would be updated if batch upload successed.
1303     int ret = CheckTaskIdValid(uploadParam.taskId);
1304     if (ret != E_OK) {
1305         return ret;
1306     }
1307     ret = CloudSyncUtils::CheckCloudSyncDataValid(uploadData, innerProcessInfo.tableName,
1308         innerProcessInfo.upLoadInfo.total);
1309     if (ret != E_OK) {
1310         LOGE("[CloudSyncer] Invalid Cloud Sync Data of Upload, %d.", ret);
1311         return ret;
1312     }
1313     TagUploadAssets(uploadData);
1314     CloudSyncUtils::UpdateAssetsFlag(uploadData);
1315     // get local water mark to be updated in future.
1316     ret = CloudSyncUtils::UpdateExtendTime(uploadData, innerProcessInfo.upLoadInfo.total,
1317         uploadParam.taskId, uploadParam.localMark);
1318     if (ret != E_OK) {
1319         LOGE("[CloudSyncer] Failed to get new local water mark in Cloud Sync Data, %d.", ret);
1320     }
1321     return ret;
1322 }
1323 
SaveCloudWaterMark(const TableName & tableName,const TaskId taskId)1324 int CloudSyncer::SaveCloudWaterMark(const TableName &tableName, const TaskId taskId)
1325 {
1326     {
1327         std::lock_guard<std::mutex> autoLock(dataLock_);
1328         if (cloudTaskInfos_[taskId].isAssetsOnly) {
1329             // assset only task does not need to save water mark.
1330             return E_OK;
1331         }
1332     }
1333     std::string cloudWaterMark;
1334     bool isUpdateCloudCursor = true;
1335     {
1336         std::lock_guard<std::mutex> autoLock(dataLock_);
1337         if (currentContext_.cloudWaterMarks.find(currentContext_.currentUserIndex) ==
1338             currentContext_.cloudWaterMarks.end() ||
1339             currentContext_.cloudWaterMarks[currentContext_.currentUserIndex].find(tableName) ==
1340             currentContext_.cloudWaterMarks[currentContext_.currentUserIndex].end()) {
1341             LOGD("[CloudSyncer] Not found water mark just return");
1342             return E_OK;
1343         }
1344         cloudWaterMark = currentContext_.cloudWaterMarks[currentContext_.currentUserIndex][tableName];
1345         isUpdateCloudCursor = currentContext_.strategy->JudgeUpdateCursor();
1346     }
1347     isUpdateCloudCursor = isUpdateCloudCursor && !(IsPriorityTask(taskId) && !IsNeedProcessCloudCursor(taskId));
1348     if (isUpdateCloudCursor) {
1349         int errCode = storageProxy_->SetCloudWaterMark(tableName, cloudWaterMark);
1350         if (errCode != E_OK) {
1351             LOGE("[CloudSyncer] Cannot set cloud water mark, %d.", errCode);
1352         }
1353         return errCode;
1354     }
1355     return E_OK;
1356 }
1357 
SetUploadDataFlag(const TaskId taskId,CloudSyncData & uploadData)1358 void CloudSyncer::SetUploadDataFlag(const TaskId taskId, CloudSyncData& uploadData)
1359 {
1360     std::lock_guard<std::mutex> autoLock(dataLock_);
1361     uploadData.isCloudForcePushStrategy = (cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PUSH);
1362     uploadData.isCompensatedTask = cloudTaskInfos_[taskId].compensatedTask;
1363 }
1364 
IsModeForcePush(TaskId taskId)1365 bool CloudSyncer::IsModeForcePush(TaskId taskId)
1366 {
1367     std::lock_guard<std::mutex> autoLock(dataLock_);
1368     return cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PUSH;
1369 }
1370 
IsModeForcePull(const TaskId taskId)1371 bool CloudSyncer::IsModeForcePull(const TaskId taskId)
1372 {
1373     std::lock_guard<std::mutex> autoLock(dataLock_);
1374     return cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PULL;
1375 }
1376 
IsPriorityTask(TaskId taskId)1377 bool CloudSyncer::IsPriorityTask(TaskId taskId)
1378 {
1379     std::lock_guard<std::mutex> autoLock(dataLock_);
1380     return cloudTaskInfos_[taskId].priorityTask;
1381 }
1382 
PreHandleData(VBucket & datum,const std::vector<std::string> & pkColNames)1383 int CloudSyncer::PreHandleData(VBucket &datum, const std::vector<std::string> &pkColNames)
1384 {
1385     // type index of field in fields, true means mandatory filed
1386     static std::vector<std::tuple<std::string, int32_t, bool>> fieldAndIndex = {
1387         std::make_tuple(CloudDbConstant::GID_FIELD, TYPE_INDEX<std::string>, true),
1388         std::make_tuple(CloudDbConstant::CREATE_FIELD, TYPE_INDEX<int64_t>, true),
1389         std::make_tuple(CloudDbConstant::MODIFY_FIELD, TYPE_INDEX<int64_t>, true),
1390         std::make_tuple(CloudDbConstant::DELETE_FIELD, TYPE_INDEX<bool>, true),
1391         std::make_tuple(CloudDbConstant::CURSOR_FIELD, TYPE_INDEX<std::string>, true),
1392         std::make_tuple(CloudDbConstant::SHARING_RESOURCE_FIELD, TYPE_INDEX<std::string>, false)
1393     };
1394 
1395     for (const auto &fieldIndex : fieldAndIndex) {
1396         if (datum.find(std::get<0>(fieldIndex)) == datum.end()) {
1397             if (!std::get<2>(fieldIndex)) { // 2 is index of mandatory flag
1398                 continue;
1399             }
1400             LOGE("[CloudSyncer] Cloud data do not contain expected field.");
1401             return -E_CLOUD_ERROR;
1402         }
1403         if (datum[std::get<0>(fieldIndex)].index() != static_cast<size_t>(std::get<1>(fieldIndex))) {
1404             LOGE("[CloudSyncer] Cloud data's field, doesn't has expected type.");
1405             return -E_CLOUD_ERROR;
1406         }
1407     }
1408 
1409     if (std::get<bool>(datum[CloudDbConstant::DELETE_FIELD])) {
1410         CloudSyncUtils::RemoveDataExceptExtendInfo(datum, pkColNames);
1411     }
1412     std::lock_guard<std::mutex> autoLock(dataLock_);
1413     if (IsDataContainDuplicateAsset(currentContext_.assetFields[currentContext_.tableName], datum)) {
1414         LOGE("[CloudSyncer] Cloud data contain duplicate asset");
1415         return -E_CLOUD_ERROR;
1416     }
1417     return E_OK;
1418 }
1419 
QueryCloudData(TaskId taskId,const std::string & tableName,std::string & cloudWaterMark,DownloadData & downloadData)1420 int CloudSyncer::QueryCloudData(TaskId taskId, const std::string &tableName, std::string &cloudWaterMark,
1421     DownloadData &downloadData)
1422 {
1423     VBucket extend;
1424     int ret = FillDownloadExtend(taskId, tableName, cloudWaterMark, extend);
1425     if (ret != E_OK) {
1426         return ret;
1427     }
1428     ret = cloudDB_.Query(tableName, extend, downloadData.data);
1429     if ((ret == E_OK || ret == -E_QUERY_END) && downloadData.data.empty()) {
1430         if (extend[CloudDbConstant::CURSOR_FIELD].index() != TYPE_INDEX<std::string>) {
1431             LOGE("[CloudSyncer] cursor type is not valid=%d", extend[CloudDbConstant::CURSOR_FIELD].index());
1432             return -E_CLOUD_ERROR;
1433         }
1434         cloudWaterMark = std::get<std::string>(extend[CloudDbConstant::CURSOR_FIELD]);
1435         LOGD("[CloudSyncer] Download data is empty, try to use other cursor=%s", cloudWaterMark.c_str());
1436         return ret;
1437     }
1438     if (ret == -E_QUERY_END) {
1439         LOGD("[CloudSyncer] Download data from cloud database success and no more data need to be downloaded");
1440         return -E_QUERY_END;
1441     }
1442     if (ret != E_OK) {
1443         LOGE("[CloudSyncer] Download data from cloud database unsuccess %d", ret);
1444     }
1445     return ret;
1446 }
1447 
CheckTaskIdValid(TaskId taskId)1448 int CloudSyncer::CheckTaskIdValid(TaskId taskId)
1449 {
1450     if (closed_) {
1451         LOGE("[CloudSyncer] DB is closed.");
1452         return -E_DB_CLOSED;
1453     }
1454     std::lock_guard<std::mutex> autoLock(dataLock_);
1455     if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1456         LOGE("[CloudSyncer] not found task.");
1457         return -E_INVALID_ARGS;
1458     }
1459     if (cloudTaskInfos_[taskId].pause) {
1460         LOGW("[CloudSyncer] check task %" PRIu64 " was paused!", taskId);
1461         return -E_TASK_PAUSED;
1462     }
1463     if (cloudTaskInfos_[taskId].errCode != E_OK) {
1464         LOGE("[CloudSyncer] task %" PRIu64 " has error: %d", taskId, cloudTaskInfos_[taskId].errCode);
1465         return cloudTaskInfos_[taskId].errCode;
1466     }
1467     return currentContext_.currentTaskId == taskId ? E_OK : -E_INVALID_ARGS;
1468 }
1469 
GetCurrentTableName(std::string & tableName)1470 int CloudSyncer::GetCurrentTableName(std::string &tableName)
1471 {
1472     std::lock_guard<std::mutex> autoLock(dataLock_);
1473     if (currentContext_.tableName.empty()) {
1474         return -E_BUSY;
1475     }
1476     tableName = currentContext_.tableName;
1477     return E_OK;
1478 }
1479 
GetCurrentCommonTaskNum()1480 size_t CloudSyncer::GetCurrentCommonTaskNum()
1481 {
1482     size_t queuedTaskNum = taskQueue_.count(CloudDbConstant::COMMON_TASK_PRIORITY_LEVEL);
1483     auto rang = taskQueue_.equal_range(CloudDbConstant::COMMON_TASK_PRIORITY_LEVEL);
1484     size_t compensatedTaskNum = 0;
1485     for (auto it = rang.first; it != rang.second; ++it) {
1486         compensatedTaskNum += cloudTaskInfos_[it->second].compensatedTask ? 1 : 0;
1487     }
1488     return queuedTaskNum - compensatedTaskNum;
1489 }
1490 
CheckQueueSizeWithNoLock(bool priorityTask)1491 int CloudSyncer::CheckQueueSizeWithNoLock(bool priorityTask)
1492 {
1493     int32_t limit = queuedManualSyncLimit_;
1494     size_t totalTaskNum = taskQueue_.size();
1495     size_t commonTaskNum = GetCurrentCommonTaskNum();
1496     size_t queuedTaskNum = priorityTask ? totalTaskNum - commonTaskNum : commonTaskNum;
1497     if (queuedTaskNum >= static_cast<size_t>(limit)) {
1498         std::string priorityName = priorityTask ? CLOUD_PRIORITY_TASK_STRING : CLOUD_COMMON_TASK_STRING;
1499         LOGW("[CloudSyncer] too much %s sync task", priorityName.c_str());
1500         return -E_BUSY;
1501     }
1502     return E_OK;
1503 }
1504 
PrepareSync(TaskId taskId)1505 int CloudSyncer::PrepareSync(TaskId taskId)
1506 {
1507     std::lock_guard<std::mutex> autoLock(dataLock_);
1508     if (hasKvRemoveTask) {
1509         return -E_CLOUD_ERROR;
1510     }
1511     if (closed_ || cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1512         LOGW("[CloudSyncer] Abort sync because syncer is closed");
1513         return -E_DB_CLOSED;
1514     }
1515     if (closed_ || currentContext_.currentTaskId != INVALID_TASK_ID) {
1516         LOGW("[CloudSyncer] Abort sync because syncer is closed or another task is running");
1517         return -E_DB_CLOSED;
1518     }
1519     currentContext_.currentTaskId = taskId;
1520     cloudTaskInfos_[taskId].resume = cloudTaskInfos_[taskId].pause;
1521     cloudTaskInfos_[taskId].pause = false;
1522     cloudTaskInfos_[taskId].status = ProcessStatus::PROCESSING;
1523     if (cloudTaskInfos_[taskId].resume) {
1524         auto tempLocker = currentContext_.locker;
1525         currentContext_ = resumeTaskInfos_[taskId].context;
1526         currentContext_.locker = tempLocker;
1527     } else {
1528         currentContext_.notifier = std::make_shared<ProcessNotifier>(this);
1529         currentContext_.strategy =
1530             StrategyFactory::BuildSyncStrategy(cloudTaskInfos_[taskId].mode, isKvScene_, policy_);
1531         currentContext_.notifier->Init(cloudTaskInfos_[taskId].table, cloudTaskInfos_[taskId].devices,
1532             cloudTaskInfos_[taskId].users);
1533         currentContext_.processRecorder = std::make_shared<ProcessRecorder>();
1534     }
1535     LOGI("[CloudSyncer] exec storeId %.3s taskId %" PRIu64 " priority[%d] compensated[%d] logicDelete[%d]",
1536         cloudTaskInfos_[taskId].storeId.c_str(), taskId, static_cast<int>(cloudTaskInfos_[taskId].priorityTask),
1537         static_cast<int>(cloudTaskInfos_[taskId].compensatedTask),
1538         static_cast<int>(storageProxy_->IsCurrentLogicDelete()));
1539     return E_OK;
1540 }
1541 
LockCloud(TaskId taskId)1542 int CloudSyncer::LockCloud(TaskId taskId)
1543 {
1544     int period;
1545     {
1546         auto res = cloudDB_.Lock();
1547         if (res.first != E_OK) {
1548             return res.first;
1549         }
1550         period = static_cast<int>(res.second) / HEARTBEAT_PERIOD;
1551     }
1552     int errCode = StartHeartBeatTimer(period, taskId);
1553     if (errCode != E_OK) {
1554         UnlockCloud();
1555     }
1556     return errCode;
1557 }
1558 
UnlockCloud()1559 int CloudSyncer::UnlockCloud()
1560 {
1561     FinishHeartBeatTimer();
1562     return cloudDB_.UnLock();
1563 }
1564 
StartHeartBeatTimer(int period,TaskId taskId)1565 int CloudSyncer::StartHeartBeatTimer(int period, TaskId taskId)
1566 {
1567     if (timerId_ != 0u) {
1568         LOGW("[CloudSyncer] HeartBeat timer has been start!");
1569         return E_OK;
1570     }
1571     TimerId timerId = 0;
1572     int errCode = RuntimeContext::GetInstance()->SetTimer(period, [this, taskId](TimerId timerId) {
1573         HeartBeat(timerId, taskId);
1574         return E_OK;
1575     }, nullptr, timerId);
1576     if (errCode != E_OK) {
1577         LOGE("[CloudSyncer] HeartBeat timer start failed %d", errCode);
1578         return errCode;
1579     }
1580     timerId_ = timerId;
1581     return E_OK;
1582 }
1583 
FinishHeartBeatTimer()1584 void CloudSyncer::FinishHeartBeatTimer()
1585 {
1586     if (timerId_ == 0u) {
1587         return;
1588     }
1589     RuntimeContext::GetInstance()->RemoveTimer(timerId_, true);
1590     timerId_ = 0u;
1591     LOGD("[CloudSyncer] Finish heartbeat timer ok");
1592 }
1593 
HeartBeat(TimerId timerId,TaskId taskId)1594 void CloudSyncer::HeartBeat(TimerId timerId, TaskId taskId)
1595 {
1596     if (timerId_ != timerId) {
1597         return;
1598     }
1599     IncObjRef(this);
1600     {
1601         std::lock_guard<std::mutex> autoLock(heartbeatMutex_);
1602         heartbeatCount_[taskId]++;
1603     }
1604     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, taskId]() {
1605         {
1606             std::lock_guard<std::mutex> guard(dataLock_);
1607             if (currentContext_.currentTaskId != taskId) {
1608                 heartbeatCount_.erase(taskId);
1609                 failedHeartbeatCount_.erase(taskId);
1610                 DecObjRef(this);
1611                 return;
1612             }
1613         }
1614         if (heartbeatCount_[taskId] >= HEARTBEAT_PERIOD) {
1615             // heartbeat block twice should finish task now
1616             SetTaskFailed(taskId, -E_CLOUD_ERROR);
1617         } else {
1618             int ret = cloudDB_.HeartBeat();
1619             if (ret != E_OK) {
1620                 HeartBeatFailed(taskId, ret);
1621             } else {
1622                 failedHeartbeatCount_[taskId] = 0;
1623             }
1624         }
1625         {
1626             std::lock_guard<std::mutex> autoLock(heartbeatMutex_);
1627             heartbeatCount_[taskId]--;
1628             if (currentContext_.currentTaskId != taskId) {
1629                 heartbeatCount_.erase(taskId);
1630                 failedHeartbeatCount_.erase(taskId);
1631             }
1632         }
1633         DecObjRef(this);
1634     });
1635     if (errCode != E_OK) {
1636         LOGW("[CloudSyncer] schedule heartbeat task failed %d", errCode);
1637         DecObjRef(this);
1638     }
1639 }
1640 
HeartBeatFailed(TaskId taskId,int errCode)1641 void CloudSyncer::HeartBeatFailed(TaskId taskId, int errCode)
1642 {
1643     failedHeartbeatCount_[taskId]++;
1644     if (failedHeartbeatCount_[taskId] < MAX_HEARTBEAT_FAILED_LIMIT) {
1645         return;
1646     }
1647     LOGW("[CloudSyncer] heartbeat failed too much times!");
1648     FinishHeartBeatTimer();
1649     SetTaskFailed(taskId, errCode);
1650 }
1651 
SetTaskFailed(TaskId taskId,int errCode)1652 void CloudSyncer::SetTaskFailed(TaskId taskId, int errCode)
1653 {
1654     std::lock_guard<std::mutex> autoLock(dataLock_);
1655     if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1656         return;
1657     }
1658     if (cloudTaskInfos_[taskId].errCode != E_OK) {
1659         return;
1660     }
1661     cloudTaskInfos_[taskId].errCode = errCode;
1662 }
1663 
GetCloudSyncTaskCount()1664 int32_t CloudSyncer::GetCloudSyncTaskCount()
1665 {
1666     std::lock_guard<std::mutex> autoLock(dataLock_);
1667     return static_cast<int32_t>(taskQueue_.size());
1668 }
1669 
CleanCloudData(ClearMode mode,const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema)1670 int CloudSyncer::CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList,
1671     const RelationalSchemaObject &localSchema)
1672 {
1673     std::lock_guard<std::mutex> lock(syncMutex_);
1674     int index = 1;
1675     for (const auto &tableName: tableNameList) {
1676         LOGD("[CloudSyncer] Start clean cloud water mark. table index: %d.", index);
1677         int ret = storageProxy_->CleanWaterMark(tableName);
1678         if (ret != E_OK) {
1679         LOGE("[CloudSyncer] failed to put cloud water mark after clean cloud data, %d.", ret);
1680             return ret;
1681         }
1682         index++;
1683     }
1684     int errCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
1685     if (errCode != E_OK) {
1686         LOGE("[CloudSyncer] failed to start Transaction before clean cloud data, %d", errCode);
1687         return errCode;
1688     }
1689 
1690     std::vector<Asset> assets;
1691     errCode = storageProxy_->CleanCloudData(mode, tableNameList, localSchema, assets);
1692     if (errCode != E_OK) {
1693         LOGE("[CloudSyncer] failed to clean cloud data, %d.", errCode);
1694         storageProxy_->Rollback();
1695         return errCode;
1696     }
1697     LOGI("[CloudSyncer] Clean cloud data success, mode=%d", mode);
1698     if (!assets.empty() && mode == FLAG_AND_DATA) {
1699         errCode = cloudDB_.RemoveLocalAssets(assets);
1700         if (errCode != E_OK) {
1701             LOGE("[Storage Executor] failed to remove local assets, %d.", errCode);
1702             storageProxy_->Rollback();
1703             return errCode;
1704         }
1705         LOGI("[CloudSyncer] Remove local asset success, size=%zu", assets.size());
1706     }
1707 
1708     storageProxy_->Commit();
1709     return errCode;
1710 }
1711 
ClearCloudWatermark(const std::vector<std::string> & tableNameList)1712 int CloudSyncer::ClearCloudWatermark(const std::vector<std::string> &tableNameList)
1713 {
1714     std::lock_guard<std::mutex> lock(syncMutex_);
1715     return CloudSyncUtils::ClearCloudWatermark(tableNameList, storageProxy_);
1716 }
1717 
CleanWaterMarkInMemory(const std::set<std::string> & tableNameList)1718 int CloudSyncer::CleanWaterMarkInMemory(const std::set<std::string> &tableNameList)
1719 {
1720     for (const auto &tableName: tableNameList) {
1721         int ret = storageProxy_->CleanWaterMarkInMemory(tableName);
1722         if (ret != E_OK) {
1723             LOGE("[CloudSyncer] failed to clean cloud water mark in memory, %d.", ret);
1724             return ret;
1725         }
1726     }
1727     return E_OK;
1728 }
1729 
UpdateCloudWaterMark(TaskId taskId,const SyncParam & param)1730 void CloudSyncer::UpdateCloudWaterMark(TaskId taskId, const SyncParam &param)
1731 {
1732     LOGI("[CloudSyncer] save cloud water [%s] of table [%s length[%u]]", param.cloudWaterMark.c_str(),
1733         DBCommon::StringMiddleMasking(param.tableName).c_str(), param.tableName.length());
1734     {
1735         std::lock_guard<std::mutex> autoLock(dataLock_);
1736         if (cloudTaskInfos_[taskId].isAssetsOnly) {
1737             LOGI("[CloudSyncer] assets only task not save water mark.");
1738             return;
1739         }
1740         currentContext_.cloudWaterMarks[currentContext_.currentUserIndex][param.info.tableName] = param.cloudWaterMark;
1741     }
1742 }
1743 
CommitDownloadResult(const DownloadItem & downloadItem,InnerProcessInfo & info,DownloadCommitList & commitList,int errCode)1744 int CloudSyncer::CommitDownloadResult(const DownloadItem &downloadItem, InnerProcessInfo &info,
1745     DownloadCommitList &commitList, int errCode)
1746 {
1747     if (commitList.empty()) {
1748         return E_OK;
1749     }
1750     uint32_t successCount = 0u;
1751     int ret = E_OK;
1752     if (info.isAsyncDownload) {
1753         ret = HandleDownloadResultForAsyncDownload(downloadItem, info, commitList, successCount);
1754     } else {
1755         ret = HandleDownloadResult(downloadItem, info, commitList, successCount);
1756     }
1757     if (errCode == E_OK) {
1758         info.downLoadInfo.failCount += (commitList.size() - successCount);
1759         info.downLoadInfo.successCount -= (commitList.size() - successCount);
1760     }
1761     if (ret != E_OK) {
1762         LOGE("Commit download result failed.%d", ret);
1763     }
1764     commitList.clear();
1765     return ret;
1766 }
1767 
GetIdentify() const1768 std::string CloudSyncer::GetIdentify() const
1769 {
1770     return id_;
1771 }
1772 
TagStatusByStrategy(bool isExist,SyncParam & param,DataInfo & dataInfo,OpType & strategyOpResult)1773 int CloudSyncer::TagStatusByStrategy(bool isExist, SyncParam &param, DataInfo &dataInfo, OpType &strategyOpResult)
1774 {
1775     strategyOpResult = OpType::NOT_HANDLE;
1776     // ignore same record with local generate data
1777     if (dataInfo.localInfo.logInfo.device.empty() &&
1778         !CloudSyncUtils::NeedSaveData(dataInfo.localInfo.logInfo, dataInfo.cloudLogInfo)) {
1779         // not handle same data
1780         return E_OK;
1781     }
1782     {
1783         std::lock_guard<std::mutex> autoLock(dataLock_);
1784         if (!currentContext_.strategy) {
1785             LOGE("[CloudSyncer] strategy has not been set when tag status, %d.", -E_INTERNAL_ERROR);
1786             return -E_INTERNAL_ERROR;
1787         }
1788         bool isCloudWin = storageProxy_->IsTagCloudUpdateLocal(dataInfo.localInfo.logInfo,
1789             dataInfo.cloudLogInfo, policy_);
1790         strategyOpResult = currentContext_.strategy->TagSyncDataStatus(isExist, isCloudWin,
1791             dataInfo.localInfo.logInfo, dataInfo.cloudLogInfo);
1792     }
1793     if (strategyOpResult == OpType::DELETE) {
1794         param.deletePrimaryKeySet.insert(dataInfo.localInfo.logInfo.hashKey);
1795     }
1796     return E_OK;
1797 }
1798 
GetLocalInfo(size_t index,SyncParam & param,DataInfoWithLog & logInfo,std::map<std::string,LogInfo> & localLogInfoCache,VBucket & localAssetInfo)1799 int CloudSyncer::GetLocalInfo(size_t index, SyncParam &param, DataInfoWithLog &logInfo,
1800     std::map<std::string, LogInfo> &localLogInfoCache, VBucket &localAssetInfo)
1801 {
1802     int errCode = storageProxy_->GetInfoByPrimaryKeyOrGid(param.tableName, param.downloadData.data[index], true,
1803         logInfo, localAssetInfo);
1804     if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1805         return errCode;
1806     }
1807     std::string hashKey(logInfo.logInfo.hashKey.begin(), logInfo.logInfo.hashKey.end());
1808     if (hashKey.empty()) {
1809         return errCode;
1810     }
1811 
1812     int ret = CheckLocalQueryAssetsOnlyIfNeed(localAssetInfo, param, logInfo);
1813     if (ret != E_OK) {
1814         LOGE("[CloudSyncer] check local assets failed, error code: %d", ret);
1815         return ret;
1816     }
1817 
1818     param.downloadData.existDataKey[index] = logInfo.logInfo.dataKey;
1819     param.downloadData.existDataHashKey[index] = logInfo.logInfo.hashKey;
1820     if (localLogInfoCache.find(hashKey) != localLogInfoCache.end()) {
1821         LOGD("[CloudSyncer] exist same record in one batch, override from cache record! hash=%.3s",
1822             DBCommon::TransferStringToHex(hashKey).c_str());
1823         logInfo.logInfo.flag = localLogInfoCache[hashKey].flag;
1824         logInfo.logInfo.wTimestamp = localLogInfoCache[hashKey].wTimestamp;
1825         logInfo.logInfo.timestamp = localLogInfoCache[hashKey].timestamp;
1826         logInfo.logInfo.cloudGid = localLogInfoCache[hashKey].cloudGid;
1827         logInfo.logInfo.device = localLogInfoCache[hashKey].device;
1828         logInfo.logInfo.sharingResource = localLogInfoCache[hashKey].sharingResource;
1829         logInfo.logInfo.status = localLogInfoCache[hashKey].status;
1830         // delete record should remove local asset info
1831         if ((localLogInfoCache[hashKey].flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
1832             localAssetInfo.clear();
1833         }
1834         errCode = E_OK;
1835     }
1836     logInfo.logInfo.isNeedUpdateAsset = CloudSyncUtils::IsNeedUpdateAsset(localAssetInfo);
1837     return errCode;
1838 }
1839 
GetNextTaskId()1840 TaskId CloudSyncer::GetNextTaskId()
1841 {
1842     std::lock_guard<std::mutex> autoLock(dataLock_);
1843     if (!taskQueue_.empty()) {
1844         return taskQueue_.begin()->second;
1845     }
1846     return INVALID_TASK_ID;
1847 }
1848 
MarkCurrentTaskPausedIfNeed(const CloudTaskInfo & taskInfo)1849 void CloudSyncer::MarkCurrentTaskPausedIfNeed(const CloudTaskInfo &taskInfo)
1850 {
1851     // must sure have dataLock_ before call this function.
1852     if (currentContext_.currentTaskId == INVALID_TASK_ID) {
1853         return;
1854     }
1855     if (cloudTaskInfos_.find(currentContext_.currentTaskId) == cloudTaskInfos_.end()) {
1856         return;
1857     }
1858     if (cloudTaskInfos_[currentContext_.currentTaskId].priorityLevel < taskInfo.priorityLevel) {
1859         cloudTaskInfos_[currentContext_.currentTaskId].pause = true;
1860         LOGD("[CloudSyncer] Mark taskId %" PRIu64 " paused success", currentContext_.currentTaskId);
1861     }
1862 }
1863 
SetCurrentTaskFailedWithoutLock(int errCode)1864 void CloudSyncer::SetCurrentTaskFailedWithoutLock(int errCode)
1865 {
1866     if (currentContext_.currentTaskId == INVALID_TASK_ID) {
1867         return;
1868     }
1869     cloudTaskInfos_[currentContext_.currentTaskId].errCode = errCode;
1870 }
1871 
LockCloudIfNeed(TaskId taskId)1872 int CloudSyncer::LockCloudIfNeed(TaskId taskId)
1873 {
1874     {
1875         std::lock_guard<std::mutex> autoLock(dataLock_);
1876         if (currentContext_.locker != nullptr) {
1877             LOGD("[CloudSyncer] lock exist");
1878             return E_OK;
1879         }
1880     }
1881     std::shared_ptr<CloudLocker> locker = nullptr;
1882     int errCode = CloudLocker::BuildCloudLock([taskId, this]() {
1883         return LockCloud(taskId);
1884     }, [this]() {
1885         int unlockCode = UnlockCloud();
1886         if (unlockCode != E_OK) {
1887             SetCurrentTaskFailedWithoutLock(unlockCode);
1888         }
1889     }, locker);
1890     {
1891         std::lock_guard<std::mutex> autoLock(dataLock_);
1892         currentContext_.locker = locker;
1893     }
1894     return errCode;
1895 }
1896 
UnlockIfNeed()1897 void CloudSyncer::UnlockIfNeed()
1898 {
1899     std::shared_ptr<CloudLocker> cacheLocker;
1900     {
1901         std::lock_guard<std::mutex> autoLock(dataLock_);
1902         cacheLocker = currentContext_.locker;
1903         currentContext_.locker = nullptr;
1904     }
1905     // unlock without mutex
1906     cacheLocker = nullptr;
1907 }
1908 
ClearCurrentContextWithoutLock()1909 void CloudSyncer::ClearCurrentContextWithoutLock()
1910 {
1911     heartbeatCount_.erase(currentContext_.currentTaskId);
1912     failedHeartbeatCount_.erase(currentContext_.currentTaskId);
1913     currentContext_.currentTaskId = INVALID_TASK_ID;
1914     currentContext_.notifier = nullptr;
1915     currentContext_.strategy = nullptr;
1916     currentContext_.processRecorder = nullptr;
1917     currentContext_.tableName.clear();
1918     currentContext_.assetDownloadList.clear();
1919     currentContext_.assetFields.clear();
1920     currentContext_.assetsInfo.clear();
1921     currentContext_.cloudWaterMarks.clear();
1922     currentContext_.isNeedUpload = false;
1923     currentContext_.currentUserIndex = 0;
1924     currentContext_.repeatCount = 0;
1925 }
1926 
IsAlreadyHaveCompensatedSyncTask()1927 bool CloudSyncer::IsAlreadyHaveCompensatedSyncTask()
1928 {
1929     std::lock_guard<std::mutex> autoLock(dataLock_);
1930     for (const auto &item : cloudTaskInfos_) {
1931         const auto &taskInfo = item.second;
1932         if (taskInfo.compensatedTask) {
1933             return true;
1934         }
1935     }
1936     return false;
1937 }
1938 
ClearContextAndNotify(TaskId taskId,int errCode)1939 void CloudSyncer::ClearContextAndNotify(TaskId taskId, int errCode)
1940 {
1941     std::shared_ptr<ProcessNotifier> notifier = nullptr;
1942     CloudTaskInfo info;
1943     {
1944         // clear current context
1945         std::lock_guard<std::mutex> autoLock(dataLock_);
1946         notifier = currentContext_.notifier;
1947         ClearCurrentContextWithoutLock();
1948         if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) { // should not happen
1949             LOGW("[CloudSyncer] taskId %" PRIu64 " has been finished!", taskId);
1950             contextCv_.notify_all();
1951             return;
1952         }
1953         info = std::move(cloudTaskInfos_[taskId]);
1954         cloudTaskInfos_.erase(taskId);
1955         resumeTaskInfos_.erase(taskId);
1956     }
1957     int err = storageProxy_->ClearUnLockingNoNeedCompensated();
1958     if (err != E_OK) {
1959         // if clear unlocking failed, no return to avoid affecting the entire process
1960         LOGW("[CloudSyncer] clear unlocking status failed! errCode = %d", err);
1961     }
1962     contextCv_.notify_all();
1963     if (info.errCode == E_OK) {
1964         info.errCode = errCode;
1965     }
1966     LOGI("[CloudSyncer] finished storeId %.3s taskId %" PRIu64 " errCode %d", info.storeId.c_str(), taskId,
1967         info.errCode);
1968     info.status = ProcessStatus::FINISHED;
1969     if (notifier != nullptr) {
1970         notifier->UpdateAllTablesFinally();
1971         notifier->NotifyProcess(info, {}, true);
1972     }
1973     // generate compensated sync
1974     // if already have compensated sync task in queue, no need to generate new compensated sync task
1975     if (!info.compensatedTask && !IsAlreadyHaveCompensatedSyncTask()) {
1976         CloudTaskInfo taskInfo = CloudSyncUtils::InitCompensatedSyncTaskInfo(info);
1977         GenerateCompensatedSync(taskInfo);
1978     }
1979 }
1980 
DownloadOneBatch(TaskId taskId,SyncParam & param,bool isFirstDownload)1981 int CloudSyncer::DownloadOneBatch(TaskId taskId, SyncParam &param, bool isFirstDownload)
1982 {
1983     int ret = CheckTaskIdValid(taskId);
1984     if (ret != E_OK) {
1985         return ret;
1986     }
1987     ret = DownloadDataFromCloud(taskId, param, isFirstDownload);
1988     if (ret != E_OK) {
1989         return ret;
1990     }
1991     bool downloadAssetOnly = false;
1992     if (param.downloadData.data.empty()) {
1993         if (param.assetsDownloadList.empty()) {
1994             return E_OK;
1995         }
1996         LOGI("[CloudSyncer] Resume task need download assets");
1997         downloadAssetOnly = true;
1998     }
1999     // Save data in transaction, update cloud water mark, notify process and changed data
2000     ret = SaveDataNotifyProcess(taskId, param, downloadAssetOnly);
2001     if (ret == -E_TASK_PAUSED) {
2002         return ret;
2003     }
2004     if (ret != E_OK) {
2005         std::lock_guard<std::mutex> autoLock(dataLock_);
2006         param.info.tableStatus = ProcessStatus::FINISHED;
2007         currentContext_.notifier->UpdateProcess(param.info);
2008         return ret;
2009     }
2010     (void)NotifyInDownload(taskId, param, isFirstDownload);
2011     return SaveCloudWaterMark(param.tableName, taskId);
2012 }
2013 
DownloadOneAssetRecord(const std::set<Key> & dupHashKeySet,const DownloadList & downloadList,DownloadItem & downloadItem,InnerProcessInfo & info,ChangedData & changedAssets)2014 int CloudSyncer::DownloadOneAssetRecord(const std::set<Key> &dupHashKeySet, const DownloadList &downloadList,
2015     DownloadItem &downloadItem, InnerProcessInfo &info, ChangedData &changedAssets)
2016 {
2017     CloudStorageUtils::EraseNoChangeAsset(downloadItem.assets);
2018     if (downloadItem.assets.empty()) { // Download data (include deleting)
2019         return E_OK;
2020     }
2021     bool isSharedTable = false;
2022     int errorCode = storageProxy_->IsSharedTable(info.tableName, isSharedTable);
2023     if (errorCode != E_OK) {
2024         LOGE("[CloudSyncer] DownloadOneAssetRecord cannot judge the table is a shared table. %d", errorCode);
2025         return errorCode;
2026     }
2027     if (!isSharedTable) {
2028         errorCode = DownloadAssetsOneByOne(info, downloadItem, downloadItem.assets);
2029         if (errorCode == -E_NOT_SET) {
2030             return -E_NOT_SET;
2031         }
2032     } else {
2033         // share table will not download asset, need to reset the status
2034         for (auto &entry: downloadItem.assets) {
2035             for (auto &asset: entry.second) {
2036                 asset.status = AssetStatus::NORMAL;
2037             }
2038         }
2039     }
2040     ModifyDownLoadInfoCount(errorCode, info);
2041     if (!downloadItem.assets.empty()) {
2042         if (dupHashKeySet.find(downloadItem.hashKey) == dupHashKeySet.end()) {
2043             if (CloudSyncUtils::OpTypeToChangeType(downloadItem.strategy) == OP_BUTT) {
2044                 LOGW("[CloudSyncer] [DownloadOneAssetRecord] strategy is invalid.");
2045             } else {
2046                 changedAssets.primaryData[CloudSyncUtils::OpTypeToChangeType(downloadItem.strategy)].push_back(
2047                     downloadItem.primaryKeyValList);
2048             }
2049         } else if (downloadItem.strategy == OpType::INSERT) {
2050             changedAssets.primaryData[ChangeType::OP_UPDATE].push_back(downloadItem.primaryKeyValList);
2051         }
2052     }
2053     return errorCode;
2054 }
2055 
GetSyncParamForDownload(TaskId taskId,SyncParam & param)2056 int CloudSyncer::GetSyncParamForDownload(TaskId taskId, SyncParam &param)
2057 {
2058     if (IsCurrentTableResume(taskId, false)) {
2059         std::lock_guard<std::mutex> autoLock(dataLock_);
2060         if (resumeTaskInfos_[taskId].syncParam.tableName == currentContext_.tableName) {
2061             param = resumeTaskInfos_[taskId].syncParam;
2062             resumeTaskInfos_[taskId].syncParam = {};
2063             int ret = storageProxy_->GetCloudWaterMark(param.tableName, param.cloudWaterMark);
2064             if (ret != E_OK) {
2065                 LOGW("[CloudSyncer] Cannot get cloud water level from cloud meta data when table is resume: %d.", ret);
2066             }
2067             return E_OK;
2068         }
2069     }
2070     int ret = GetCurrentTableName(param.tableName);
2071     if (ret != E_OK) {
2072         LOGE("[CloudSyncer] Invalid table name for syncing: %d", ret);
2073         return ret;
2074     }
2075     param.info.tableName = param.tableName;
2076     std::vector<Field> assetFields;
2077     // only no primary key and composite primary key contains rowid.
2078     ret = storageProxy_->GetPrimaryColNamesWithAssetsFields(param.tableName, param.pkColNames, assetFields);
2079     if (ret != E_OK) {
2080         LOGE("[CloudSyncer] Cannot get primary column names: %d", ret);
2081         return ret;
2082     }
2083     std::shared_ptr<ProcessNotifier> notifier = nullptr;
2084     {
2085         std::lock_guard<std::mutex> autoLock(dataLock_);
2086         currentContext_.assetFields[currentContext_.tableName] = assetFields;
2087         notifier = currentContext_.notifier;
2088     }
2089     param.isSinglePrimaryKey = CloudSyncUtils::IsSinglePrimaryKey(param.pkColNames);
2090     if (!IsModeForcePull(taskId) && (!IsPriorityTask(taskId) || IsNeedProcessCloudCursor(taskId))) {
2091         ret = storageProxy_->GetCloudWaterMark(param.tableName, param.cloudWaterMark);
2092         if (ret != E_OK) {
2093             LOGE("[CloudSyncer] Cannot get cloud water level from cloud meta data: %d.", ret);
2094         }
2095         if (!IsCurrentTaskResume(taskId)) {
2096             ReloadCloudWaterMarkIfNeed(param.tableName, param.cloudWaterMark);
2097         }
2098     }
2099     notifier->GetDownloadInfoByTableName(param.info);
2100     auto queryObject = GetQuerySyncObject(param.tableName);
2101     param.isAssetsOnly = queryObject.IsAssetsOnly();
2102     param.groupNum = queryObject.GetGroupNum();
2103     param.assetsGroupMap = queryObject.GetAssetsOnlyGroupMap();
2104     param.isVaildForAssetsOnly = queryObject.AssetsOnlyErrFlag() == E_OK;
2105     param.isForcePullAseets = IsModeForcePull(taskId) && queryObject.IsContainQueryNodes();
2106     return ret;
2107 }
2108 
IsCurrentTaskResume(TaskId taskId)2109 bool CloudSyncer::IsCurrentTaskResume(TaskId taskId)
2110 {
2111     std::lock_guard<std::mutex> autoLock(dataLock_);
2112     return cloudTaskInfos_[taskId].resume;
2113 }
2114 
IsCurrentTableResume(TaskId taskId,bool upload)2115 bool CloudSyncer::IsCurrentTableResume(TaskId taskId, bool upload)
2116 {
2117     std::lock_guard<std::mutex> autoLock(dataLock_);
2118     if (!cloudTaskInfos_[taskId].resume) {
2119         return false;
2120     }
2121     if (currentContext_.tableName != resumeTaskInfos_[taskId].context.tableName) {
2122         return false;
2123     }
2124     return upload == resumeTaskInfos_[taskId].upload;
2125 }
2126 
DownloadDataFromCloud(TaskId taskId,SyncParam & param,bool isFirstDownload)2127 int CloudSyncer::DownloadDataFromCloud(TaskId taskId, SyncParam &param, bool isFirstDownload)
2128 {
2129     // Get cloud data after cloud water mark
2130     param.info.tableStatus = ProcessStatus::PROCESSING;
2131     param.downloadData = {};
2132     if (param.isAssetsOnly) {
2133         param.cloudWaterMarkForAssetsOnly = param.cloudWaterMark;
2134     }
2135     int ret = QueryCloudData(taskId, param.info.tableName, param.cloudWaterMark, param.downloadData);
2136     {
2137         std::lock_guard<std::mutex> autoLock(dataLock_);
2138         CloudSyncUtils::CheckQueryCloudData(
2139             cloudTaskInfos_[taskId].prepareTraceId, param.downloadData, param.pkColNames);
2140     }
2141     if (ret == -E_QUERY_END) {
2142         // Won't break here since downloadData may not be null
2143         param.isLastBatch = true;
2144     } else if (ret != E_OK) {
2145         std::lock_guard<std::mutex> autoLock(dataLock_);
2146         param.info.tableStatus = ProcessStatus::FINISHED;
2147         currentContext_.notifier->UpdateProcess(param.info);
2148         return ret;
2149     }
2150 
2151     ret = CheckCloudQueryAssetsOnlyIfNeed(taskId, param);
2152     if (ret != E_OK) {
2153         LOGE("[CloudSyncer] query assets failed, error code: %d", ret);
2154         std::lock_guard<std::mutex> autoLock(dataLock_);
2155         param.info.tableStatus = ProcessStatus::FINISHED;
2156         currentContext_.notifier->UpdateProcess(param.info);
2157         return ret;
2158     }
2159 
2160     if (param.downloadData.data.empty()) {
2161         if (ret == E_OK || isFirstDownload) {
2162             LOGD("[CloudSyncer] try to query cloud data use increment water mark");
2163             UpdateCloudWaterMark(taskId, param);
2164             // Cloud water may change on the cloud, it needs to be saved here
2165             SaveCloudWaterMark(param.tableName, taskId);
2166         }
2167         if (isFirstDownload) {
2168             NotifyInEmptyDownload(taskId, param.info);
2169         }
2170     }
2171     return E_OK;
2172 }
2173 
ModifyDownLoadInfoCount(const int errorCode,InnerProcessInfo & info)2174 void CloudSyncer::ModifyDownLoadInfoCount(const int errorCode, InnerProcessInfo &info)
2175 {
2176     if (errorCode == E_OK) {
2177         return;
2178     }
2179     info.downLoadInfo.failCount += 1;
2180     if (info.downLoadInfo.successCount == 0) {
2181         LOGW("[CloudSyncer] Invalid successCount");
2182     } else {
2183         info.downLoadInfo.successCount -= 1;
2184     }
2185 }
2186 } // namespace DistributedDB
2187