1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "cloud_syncer.h"
16
17 #include <cstdint>
18 #include <unordered_map>
19 #include <utility>
20
21 #include "cloud/asset_operation_utils.h"
22 #include "cloud/cloud_db_constant.h"
23 #include "cloud/cloud_storage_utils.h"
24 #include "cloud/icloud_db.h"
25 #include "cloud_sync_tag_assets.h"
26 #include "cloud_sync_utils.h"
27 #include "db_dfx_adapter.h"
28 #include "db_errno.h"
29 #include "log_print.h"
30 #include "runtime_context.h"
31 #include "storage_proxy.h"
32 #include "store_types.h"
33 #include "strategy_factory.h"
34 #include "version.h"
35
36 namespace DistributedDB {
CloudSyncer(std::shared_ptr<StorageProxy> storageProxy,SingleVerConflictResolvePolicy policy)37 CloudSyncer::CloudSyncer(std::shared_ptr<StorageProxy> storageProxy, SingleVerConflictResolvePolicy policy)
38 : lastTaskId_(INVALID_TASK_ID),
39 storageProxy_(std::move(storageProxy)),
40 queuedManualSyncLimit_(DBConstant::QUEUED_SYNC_LIMIT_DEFAULT),
41 closed_(false),
42 timerId_(0u),
43 policy_(policy)
44 {
45 if (storageProxy_ != nullptr) {
46 id_ = storageProxy_->GetIdentify();
47 }
48 InitCloudSyncStateMachine();
49 }
50
Sync(const std::vector<DeviceID> & devices,SyncMode mode,const std::vector<std::string> & tables,const SyncProcessCallback & callback,int64_t waitTime)51 int CloudSyncer::Sync(const std::vector<DeviceID> &devices, SyncMode mode,
52 const std::vector<std::string> &tables, const SyncProcessCallback &callback, int64_t waitTime)
53 {
54 CloudTaskInfo taskInfo;
55 taskInfo.mode = mode;
56 taskInfo.table = tables;
57 taskInfo.callback = callback;
58 taskInfo.timeout = waitTime;
59 taskInfo.devices = devices;
60 for (const auto &item: tables) {
61 QuerySyncObject syncObject;
62 syncObject.SetTableName(item);
63 taskInfo.queryList.push_back(syncObject);
64 }
65 return Sync(taskInfo);
66 }
67
Sync(const CloudTaskInfo & taskInfo)68 int CloudSyncer::Sync(const CloudTaskInfo &taskInfo)
69 {
70 int errCode = CloudSyncUtils::CheckParamValid(taskInfo.devices, taskInfo.mode);
71 if (errCode != E_OK) {
72 return errCode;
73 }
74 if (cloudDB_.IsNotExistCloudDB()) {
75 LOGE("[CloudSyncer] Not set cloudDB!");
76 return -E_CLOUD_ERROR;
77 }
78 if (closed_) {
79 LOGE("[CloudSyncer] DB is closed!");
80 return -E_DB_CLOSED;
81 }
82 CloudTaskInfo info = taskInfo;
83 info.status = ProcessStatus::PREPARED;
84 errCode = TryToAddSyncTask(std::move(info));
85 if (errCode != E_OK) {
86 return errCode;
87 }
88 if (taskInfo.priorityTask) {
89 MarkCurrentTaskPausedIfNeed();
90 }
91 return TriggerSync();
92 }
93
SetCloudDB(const std::shared_ptr<ICloudDb> & cloudDB)94 void CloudSyncer::SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDB)
95 {
96 cloudDB_.SetCloudDB(cloudDB);
97 LOGI("[CloudSyncer] SetCloudDB finish");
98 }
99
GetCloudDB() const100 const std::map<std::string, std::shared_ptr<ICloudDb>> CloudSyncer::GetCloudDB() const
101 {
102 return cloudDB_.GetCloudDB();
103 }
104
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)105 void CloudSyncer::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
106 {
107 storageProxy_->SetIAssetLoader(loader);
108 cloudDB_.SetIAssetLoader(loader);
109 LOGI("[CloudSyncer] SetIAssetLoader finish");
110 }
111
Close()112 void CloudSyncer::Close()
113 {
114 closed_ = true;
115 CloudSyncer::TaskId currentTask;
116 {
117 std::lock_guard<std::mutex> autoLock(dataLock_);
118 currentTask = currentContext_.currentTaskId;
119 }
120 // mark current task db_closed
121 SetTaskFailed(currentTask, -E_DB_CLOSED);
122 UnlockIfNeed();
123 cloudDB_.Close();
124 WaitCurTaskFinished();
125
126 // copy all task from queue
127 std::vector<CloudTaskInfo> infoList = CopyAndClearTaskInfos();
128 for (auto &info: infoList) {
129 LOGI("[CloudSyncer] finished taskId %" PRIu64 " with db closed.", info.taskId);
130 }
131 storageProxy_->Close();
132 }
133
StopAllTasks()134 void CloudSyncer::StopAllTasks()
135 {
136 CloudSyncer::TaskId currentTask;
137 {
138 std::lock_guard<std::mutex> autoLock(dataLock_);
139 currentTask = currentContext_.currentTaskId;
140 }
141 // mark current task user_change
142 SetTaskFailed(currentTask, -E_USER_CHANGE);
143 UnlockIfNeed();
144 WaitCurTaskFinished();
145
146 std::vector<CloudTaskInfo> infoList = CopyAndClearTaskInfos();
147 for (auto &info: infoList) {
148 LOGI("[CloudSyncer] finished taskId %" PRIu64 " with user changed.", info.taskId);
149 auto processNotifier = std::make_shared<ProcessNotifier>(this);
150 processNotifier->Init(info.table, info.devices, info.users);
151 info.errCode = -E_USER_CHANGE;
152 info.status = ProcessStatus::FINISHED;
153 processNotifier->NotifyProcess(info, {}, true);
154 }
155 }
156
TriggerSync()157 int CloudSyncer::TriggerSync()
158 {
159 if (closed_) {
160 return -E_DB_CLOSED;
161 }
162 RefObject::IncObjRef(this);
163 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
164 DoSyncIfNeed();
165 RefObject::DecObjRef(this);
166 });
167 if (errCode != E_OK) {
168 LOGW("[CloudSyncer] schedule sync task failed %d", errCode);
169 RefObject::DecObjRef(this);
170 }
171 return errCode;
172 }
173
SetProxyUser(const std::string & user)174 void CloudSyncer::SetProxyUser(const std::string &user)
175 {
176 std::lock_guard<std::mutex> autoLock(dataLock_);
177 storageProxy_->SetUser(user);
178 currentContext_.notifier->SetUser(user);
179 currentContext_.currentUserIndex = currentContext_.currentUserIndex + 1;
180 cloudDB_.SwitchCloudDB(user);
181 }
182
DoSyncIfNeed()183 void CloudSyncer::DoSyncIfNeed()
184 {
185 if (closed_) {
186 return;
187 }
188 // do all sync task in this loop
189 do {
190 // get taskId from queue
191 TaskId triggerTaskId = GetNextTaskId();
192 if (triggerTaskId == INVALID_TASK_ID) {
193 LOGD("[CloudSyncer] task queue empty");
194 break;
195 }
196 // pop taskId in queue
197 if (PrepareSync(triggerTaskId) != E_OK) {
198 break;
199 }
200 // do sync logic
201 std::vector<std::string> usersList;
202 {
203 std::lock_guard<std::mutex> autoLock(dataLock_);
204 usersList = cloudTaskInfos_[triggerTaskId].users;
205 currentContext_.currentUserIndex = 0;
206 }
207 int errCode = E_OK;
208 if (usersList.empty()) {
209 SetProxyUser("");
210 errCode = DoSync(triggerTaskId);
211 } else {
212 for (const auto &user : usersList) {
213 SetProxyUser(user);
214 errCode = DoSync(triggerTaskId);
215 }
216 }
217 LOGD("[CloudSyncer] DoSync finished, errCode %d", errCode);
218 } while (!closed_);
219 LOGD("[CloudSyncer] DoSyncIfNeed finished, closed status %d", static_cast<int>(closed_));
220 }
221
DoSync(TaskId taskId)222 int CloudSyncer::DoSync(TaskId taskId)
223 {
224 std::lock_guard<std::mutex> lock(syncMutex_);
225 ResetCurrentTableUploadBatchIndex();
226 CloudTaskInfo taskInfo;
227 {
228 std::lock_guard<std::mutex> autoLock(dataLock_);
229 taskInfo = cloudTaskInfos_[taskId];
230 cloudDB_.SetPrepareTraceId(taskInfo.prepareTraceId); // SetPrepareTraceId before task started
231 LOGD("[CloudSyncer] DoSync get taskInfo, taskId is: %llu, prepareTraceId is:%s.",
232 static_cast<unsigned long long>(taskInfo.taskId), taskInfo.prepareTraceId.c_str());
233 }
234 bool needUpload = true;
235 bool isNeedFirstDownload = false;
236 {
237 std::lock_guard<std::mutex> autoLock(dataLock_);
238 needUpload = currentContext_.strategy->JudgeUpload();
239 // 1. if the locker is already exist, directly reuse the lock, no need do the first download
240 // 2. if the task(resume task) is already be tagged need upload data, no need do the first download
241 isNeedFirstDownload = (currentContext_.locker == nullptr) && (!currentContext_.isNeedUpload);
242 }
243 int errCode = E_OK;
244 bool isFirstDownload = true;
245 if (isNeedFirstDownload) {
246 // do first download
247 errCode = DoDownloadInNeed(taskInfo, needUpload, isFirstDownload);
248 SetTaskFailed(taskId, errCode);
249 if (errCode != E_OK) {
250 SyncMachineDoFinished();
251 return errCode;
252 }
253 bool isActuallyNeedUpload = false; // whether the task actually has data to upload
254 {
255 std::lock_guard<std::mutex> autoLock(dataLock_);
256 isActuallyNeedUpload = currentContext_.isNeedUpload;
257 }
258 if (!isActuallyNeedUpload) {
259 LOGI("[CloudSyncer] no table need upload!");
260 SyncMachineDoFinished();
261 return E_OK;
262 }
263 isFirstDownload = false;
264 }
265
266 {
267 std::lock_guard<std::mutex> autoLock(dataLock_);
268 currentContext_.isFirstDownload = isFirstDownload;
269 currentContext_.isRealNeedUpload = needUpload;
270 }
271 errCode = DoSyncInner(taskInfo);
272 return errCode;
273 }
274
PrepareAndUpload(const CloudTaskInfo & taskInfo,size_t index)275 int CloudSyncer::PrepareAndUpload(const CloudTaskInfo &taskInfo, size_t index)
276 {
277 {
278 std::lock_guard<std::mutex> autoLock(dataLock_);
279 currentContext_.tableName = taskInfo.table[index];
280 }
281 int errCode = CheckTaskIdValid(taskInfo.taskId);
282 if (errCode != E_OK) {
283 LOGE("[CloudSyncer] task is invalid, abort sync");
284 return errCode;
285 }
286 if (taskInfo.table.empty()) {
287 LOGE("[CloudSyncer] Invalid taskInfo table");
288 return -E_INVALID_ARGS;
289 }
290 errCode = DoUpload(taskInfo.taskId, index == (taskInfo.table.size() - 1u), taskInfo.lockAction);
291 if (errCode == -E_CLOUD_VERSION_CONFLICT) {
292 {
293 std::lock_guard<std::mutex> autoLock(dataLock_);
294 currentContext_.processRecorder->MarkDownloadFinish(currentContext_.currentUserIndex,
295 taskInfo.table[index], false);
296 LOGI("[CloudSyncer] upload version conflict, index:%zu", index);
297 }
298 return errCode;
299 }
300 if (errCode != E_OK) {
301 LOGE("[CloudSyncer] upload failed %d", errCode);
302 return errCode;
303 }
304 return errCode;
305 }
306
DoUploadInNeed(const CloudTaskInfo & taskInfo,const bool needUpload)307 int CloudSyncer::DoUploadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload)
308 {
309 if (!needUpload) {
310 return E_OK;
311 }
312 int errCode = storageProxy_->StartTransaction();
313 if (errCode != E_OK) {
314 LOGE("[CloudSyncer] start transaction failed before doing upload.");
315 return errCode;
316 }
317 for (size_t i = 0u; i < taskInfo.table.size(); ++i) {
318 LOGD("[CloudSyncer] try upload table, index: %zu", i);
319 if (IsTableFinishInUpload(taskInfo.table[i])) {
320 continue;
321 }
322 errCode = PrepareAndUpload(taskInfo, i);
323 if (errCode == -E_TASK_PAUSED) { // should re download paused table
324 MarkDownloadFinishIfNeed(taskInfo.table[i], false);
325 }
326 if (errCode != E_OK) {
327 break;
328 }
329 MarkUploadFinishIfNeed(taskInfo.table[i]);
330 }
331 if (errCode == -E_TASK_PAUSED) {
332 std::lock_guard<std::mutex> autoLock(dataLock_);
333 resumeTaskInfos_[taskInfo.taskId].upload = true;
334 }
335 if (errCode == E_OK || errCode == -E_TASK_PAUSED) {
336 int commitErrorCode = storageProxy_->Commit();
337 if (commitErrorCode != E_OK) {
338 LOGE("[CloudSyncer] cannot commit transaction: %d.", commitErrorCode);
339 }
340 } else {
341 int rollBackErrorCode = storageProxy_->Rollback();
342 if (rollBackErrorCode != E_OK) {
343 LOGE("[CloudSyncer] cannot roll back transaction: %d.", rollBackErrorCode);
344 }
345 }
346 return errCode;
347 }
348
SyncMachineDoDownload()349 CloudSyncEvent CloudSyncer::SyncMachineDoDownload()
350 {
351 CloudTaskInfo taskInfo;
352 bool needUpload;
353 bool isFirstDownload;
354 {
355 std::lock_guard<std::mutex> autoLock(dataLock_);
356 taskInfo = cloudTaskInfos_[currentContext_.currentTaskId];
357 needUpload = currentContext_.isRealNeedUpload;
358 isFirstDownload = currentContext_.isFirstDownload;
359 }
360 int errCode = E_OK;
361 if (IsLockInDownload()) {
362 errCode = LockCloudIfNeed(taskInfo.taskId);
363 }
364 if (errCode != E_OK) {
365 return SetCurrentTaskFailedInMachine(errCode);
366 }
367 errCode = DoDownloadInNeed(taskInfo, needUpload, isFirstDownload);
368 if (errCode != E_OK) {
369 if (errCode == -E_TASK_PAUSED) {
370 DBDfxAdapter::ReportBehavior(
371 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_DOWNLOAD, StageResult::CANCLE, errCode});
372 } else {
373 DBDfxAdapter::ReportBehavior(
374 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_DOWNLOAD, StageResult::FAIL, errCode});
375 }
376 return SetCurrentTaskFailedInMachine(errCode);
377 }
378 DBDfxAdapter::ReportBehavior(
379 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_DOWNLOAD, StageResult::SUCC, errCode});
380 return CloudSyncEvent::DOWNLOAD_FINISHED_EVENT;
381 }
382
SyncMachineDoUpload()383 CloudSyncEvent CloudSyncer::SyncMachineDoUpload()
384 {
385 CloudTaskInfo taskInfo;
386 bool needUpload;
387 {
388 std::lock_guard<std::mutex> autoLock(dataLock_);
389 taskInfo = cloudTaskInfos_[currentContext_.currentTaskId];
390 needUpload = currentContext_.isRealNeedUpload;
391 }
392 DBDfxAdapter::ReportBehavior(
393 {__func__, Scene::CLOUD_SYNC, State::BEGIN, Stage::CLOUD_UPLOAD, StageResult::SUCC, E_OK});
394 int errCode = DoUploadInNeed(taskInfo, needUpload);
395 if (errCode == -E_CLOUD_VERSION_CONFLICT) {
396 DBDfxAdapter::ReportBehavior(
397 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_UPLOAD, StageResult::FAIL, errCode});
398 return CloudSyncEvent::REPEAT_CHECK_EVENT;
399 }
400 if (errCode != E_OK) {
401 {
402 std::lock_guard<std::mutex> autoLock(dataLock_);
403 cloudTaskInfos_[currentContext_.currentTaskId].errCode = errCode;
404 }
405 if (errCode == -E_TASK_PAUSED) {
406 DBDfxAdapter::ReportBehavior(
407 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_UPLOAD, StageResult::CANCLE, errCode});
408 } else {
409 DBDfxAdapter::ReportBehavior(
410 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_UPLOAD, StageResult::FAIL, errCode});
411 }
412 return CloudSyncEvent::ERROR_EVENT;
413 }
414 DBDfxAdapter::ReportBehavior(
415 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_UPLOAD, StageResult::SUCC, errCode});
416 return CloudSyncEvent::UPLOAD_FINISHED_EVENT;
417 }
418
SyncMachineDoFinished()419 CloudSyncEvent CloudSyncer::SyncMachineDoFinished()
420 {
421 UnlockIfNeed();
422 TaskId taskId;
423 int errCode;
424 int currentUserIndex;
425 int userListSize;
426 {
427 std::lock_guard<std::mutex> autoLock(dataLock_);
428 taskId = currentContext_.currentTaskId;
429 errCode = cloudTaskInfos_[currentContext_.currentTaskId].errCode;
430 cloudTaskInfos_[currentContext_.currentTaskId].errCode = E_OK;
431 currentUserIndex = currentContext_.currentUserIndex;
432 userListSize = static_cast<int>(cloudTaskInfos_[taskId].users.size());
433 }
434 if (currentUserIndex >= userListSize) {
435 DoFinished(taskId, errCode);
436 } else {
437 CloudTaskInfo taskInfo;
438 {
439 std::lock_guard<std::mutex> autoLock(dataLock_);
440 taskInfo = cloudTaskInfos_[currentContext_.currentTaskId];
441 }
442 taskInfo.status = ProcessStatus::FINISHED;
443 currentContext_.notifier->NotifyProcess(taskInfo, {});
444 }
445 return CloudSyncEvent::ALL_TASK_FINISHED_EVENT;
446 }
447
DoSyncInner(const CloudTaskInfo & taskInfo)448 int CloudSyncer::DoSyncInner(const CloudTaskInfo &taskInfo)
449 {
450 cloudSyncStateMachine_.SwitchStateAndStep(CloudSyncEvent::START_SYNC_EVENT);
451 DBDfxAdapter::ReportBehavior(
452 {__func__, Scene::CLOUD_SYNC, State::BEGIN, Stage::CLOUD_SYNC, StageResult::SUCC, E_OK});
453 return E_OK;
454 }
455
DoFinished(TaskId taskId,int errCode)456 void CloudSyncer::DoFinished(TaskId taskId, int errCode)
457 {
458 storageProxy_->OnSyncFinish();
459 if (errCode == -E_TASK_PAUSED) {
460 DBDfxAdapter::ReportBehavior(
461 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_SYNC, StageResult::CANCLE, errCode});
462 LOGD("[CloudSyncer] taskId %" PRIu64 " was paused, it won't be finished now", taskId);
463 {
464 std::lock_guard<std::mutex> autoLock(dataLock_);
465 resumeTaskInfos_[taskId].context = std::move(currentContext_);
466 currentContext_.locker = resumeTaskInfos_[taskId].context.locker;
467 resumeTaskInfos_[taskId].context.locker = nullptr;
468 ClearCurrentContextWithoutLock();
469 }
470 contextCv_.notify_one();
471 return;
472 }
473 {
474 std::lock_guard<std::mutex> autoLock(dataLock_);
475 taskQueue_.remove(taskId);
476 priorityTaskQueue_.remove(taskId);
477 }
478 ClearContextAndNotify(taskId, errCode);
479 }
480
481 /**
482 * UpdateChangedData will be used for Insert case, which we can only get rowid after we saved data in db.
483 */
UpdateChangedData(SyncParam & param,DownloadList & assetsDownloadList)484 int CloudSyncer::UpdateChangedData(SyncParam ¶m, DownloadList &assetsDownloadList)
485 {
486 if (param.withoutRowIdData.insertData.empty() && param.withoutRowIdData.updateData.empty()) {
487 return E_OK;
488 }
489 int ret = E_OK;
490 for (size_t j : param.withoutRowIdData.insertData) {
491 VBucket &datum = param.downloadData.data[j];
492 std::vector<Type> primaryValues;
493 ret = CloudSyncUtils::GetCloudPkVals(datum, param.changedData.field,
494 std::get<int64_t>(datum[CloudDbConstant::ROW_ID_FIELD_NAME]), primaryValues);
495 if (ret != E_OK) {
496 LOGE("[CloudSyncer] updateChangedData cannot get primaryValues");
497 return ret;
498 }
499 param.changedData.primaryData[ChangeType::OP_INSERT].push_back(primaryValues);
500 }
501 for (const auto &tuple : param.withoutRowIdData.assetInsertData) {
502 size_t downloadIndex = std::get<0>(tuple);
503 VBucket &datum = param.downloadData.data[downloadIndex];
504 size_t insertIdx = std::get<1>(tuple);
505 std::vector<Type> &pkVal = std::get<5>(assetsDownloadList[insertIdx]); // 5 means primary key list
506 pkVal[0] = datum[CloudDbConstant::ROW_ID_FIELD_NAME];
507 }
508 for (const auto &tuple : param.withoutRowIdData.updateData) {
509 size_t downloadIndex = std::get<0>(tuple);
510 size_t updateIndex = std::get<1>(tuple);
511 VBucket &datum = param.downloadData.data[downloadIndex];
512 size_t size = param.changedData.primaryData[ChangeType::OP_UPDATE].size();
513 if (updateIndex >= size) {
514 LOGE("[CloudSyncer] updateIndex is invalid. index=%zu, size=%zu", updateIndex, size);
515 return -E_INTERNAL_ERROR;
516 }
517 if (param.changedData.primaryData[ChangeType::OP_UPDATE][updateIndex].empty()) {
518 LOGE("[CloudSyncer] primary key value list should not be empty.");
519 return -E_INTERNAL_ERROR;
520 }
521 // no primary key or composite primary key, the first element is rowid
522 param.changedData.primaryData[ChangeType::OP_UPDATE][updateIndex][0] =
523 datum[CloudDbConstant::ROW_ID_FIELD_NAME];
524 }
525 return ret;
526 }
527
IsDataContainDuplicateAsset(const std::vector<Field> & assetFields,VBucket & data)528 bool CloudSyncer::IsDataContainDuplicateAsset(const std::vector<Field> &assetFields, VBucket &data)
529 {
530 for (const auto &assetField : assetFields) {
531 if (assetField.type == TYPE_INDEX<Assets> && data[assetField.colName].index() == TYPE_INDEX<Assets>) {
532 if (CloudStorageUtils::IsAssetsContainDuplicateAsset(std::get<Assets>(data[assetField.colName]))) {
533 return true;
534 }
535 }
536 }
537 return false;
538 }
539
IsDataContainAssets()540 bool CloudSyncer::IsDataContainAssets()
541 {
542 std::lock_guard<std::mutex> autoLock(dataLock_);
543 bool hasTable = (currentContext_.assetFields.find(currentContext_.tableName) != currentContext_.assetFields.end());
544 if (!hasTable) {
545 LOGW("[CloudSyncer] failed to get assetFields, because tableName doesn't exist in currentContext, %d.",
546 -E_INTERNAL_ERROR);
547 return false;
548 }
549 if (currentContext_.assetFields[currentContext_.tableName].empty()) {
550 return false;
551 }
552 return true;
553 }
554
TagAssetsInSingleRecord(VBucket & coveredData,VBucket & beCoveredData,bool setNormalStatus,int & errCode)555 std::map<std::string, Assets> CloudSyncer::TagAssetsInSingleRecord(VBucket &coveredData, VBucket &beCoveredData,
556 bool setNormalStatus, int &errCode)
557 {
558 // Define a map to store the result
559 std::map<std::string, Assets> res = {};
560 std::vector<Field> assetFields;
561 {
562 std::lock_guard<std::mutex> autoLock(dataLock_);
563 assetFields = currentContext_.assetFields[currentContext_.tableName];
564 }
565 // For every column contain asset or assets, assetFields are in context
566 for (const Field &assetField : assetFields) {
567 Assets assets = TagAssetsInSingleCol(coveredData, beCoveredData, assetField, setNormalStatus, errCode);
568 if (!assets.empty()) {
569 res[assetField.colName] = assets;
570 }
571 if (errCode != E_OK) {
572 break;
573 }
574 }
575 return res;
576 }
577
FillCloudAssets(const std::string & tableName,VBucket & normalAssets,VBucket & failedAssets)578 int CloudSyncer::FillCloudAssets(const std::string &tableName, VBucket &normalAssets,
579 VBucket &failedAssets)
580 {
581 int ret = E_OK;
582 if (normalAssets.size() > 1) {
583 ret = storageProxy_->FillCloudAssetForDownload(tableName, normalAssets, true);
584 if (ret != E_OK) {
585 LOGE("[CloudSyncer] Can not fill normal cloud assets for download");
586 return ret;
587 }
588 }
589 if (failedAssets.size() > 1) {
590 ret = storageProxy_->FillCloudAssetForDownload(tableName, failedAssets, false);
591 if (ret != E_OK) {
592 LOGE("[CloudSyncer] Can not fill abnormal assets for download");
593 return ret;
594 }
595 }
596 return E_OK;
597 }
598
HandleDownloadResult(const DownloadItem & downloadItem,const std::string & tableName,DownloadCommitList & commitList,uint32_t & successCount)599 int CloudSyncer::HandleDownloadResult(const DownloadItem &downloadItem, const std::string &tableName,
600 DownloadCommitList &commitList, uint32_t &successCount)
601 {
602 successCount = 0;
603 int errCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
604 if (errCode != E_OK) {
605 LOGE("[CloudSyncer] start transaction Failed before handle download.");
606 return errCode;
607 }
608 errCode = CommitDownloadAssets(downloadItem, tableName, commitList, successCount);
609 if (errCode != E_OK) {
610 successCount = 0;
611 int ret = E_OK;
612 if (errCode == -E_REMOVE_ASSETS_FAILED) {
613 // remove assets failed no effect to asset status, just commit
614 ret = storageProxy_->Commit();
615 } else {
616 ret = storageProxy_->Rollback();
617 }
618 LOGE("[CloudSyncer] commit download assets failed %d commit/rollback ret %d", errCode, ret);
619 return errCode;
620 }
621 errCode = storageProxy_->Commit();
622 if (errCode != E_OK) {
623 successCount = 0;
624 LOGE("[CloudSyncer] commit failed %d", errCode);
625 }
626 return errCode;
627 }
628
CloudDbDownloadAssets(TaskId taskId,InnerProcessInfo & info,const DownloadList & downloadList,const std::set<Key> & dupHashKeySet,ChangedData & changedAssets)629 int CloudSyncer::CloudDbDownloadAssets(TaskId taskId, InnerProcessInfo &info, const DownloadList &downloadList,
630 const std::set<Key> &dupHashKeySet, ChangedData &changedAssets)
631 {
632 int downloadStatus = E_OK;
633 {
634 std::lock_guard<std::mutex> autoLock(dataLock_);
635 downloadStatus = resumeTaskInfos_[taskId].downloadStatus;
636 resumeTaskInfos_[taskId].downloadStatus = E_OK;
637 }
638 int errorCode = E_OK;
639 DownloadCommitList commitList;
640 for (size_t i = GetDownloadAssetIndex(taskId); i < downloadList.size(); i++) {
641 errorCode = CheckTaskIdValid(taskId);
642 if (errorCode != E_OK) {
643 std::lock_guard<std::mutex> autoLock(dataLock_);
644 resumeTaskInfos_[taskId].lastDownloadIndex = i;
645 resumeTaskInfos_[taskId].downloadStatus = downloadStatus;
646 break;
647 }
648 DownloadItem downloadItem;
649 GetDownloadItem(downloadList, i, downloadItem);
650 errorCode = DownloadOneAssetRecord(dupHashKeySet, downloadList, downloadItem, info, changedAssets);
651 if (errorCode == -E_NOT_SET) {
652 info.downLoadInfo.failCount += (downloadList.size() - i);
653 info.downLoadInfo.successCount -= (downloadList.size() - i);
654 return errorCode;
655 }
656 if (downloadItem.strategy == OpType::DELETE) {
657 downloadItem.assets = {};
658 downloadItem.gid = "";
659 }
660 // Process result of each asset
661 commitList.push_back(std::make_tuple(downloadItem.gid, std::move(downloadItem.assets), errorCode == E_OK));
662 downloadStatus = downloadStatus == E_OK ? errorCode : downloadStatus;
663 int ret = CommitDownloadResult(downloadItem, info, commitList, errorCode);
664 if (ret != E_OK && ret != -E_REMOVE_ASSETS_FAILED) {
665 return ret;
666 }
667 downloadStatus = downloadStatus == E_OK ? ret : downloadStatus;
668 }
669 LOGD("Download status is %d", downloadStatus);
670 return errorCode == E_OK ? downloadStatus : errorCode;
671 }
672
DownloadAssets(InnerProcessInfo & info,const std::vector<std::string> & pKColNames,const std::set<Key> & dupHashKeySet,ChangedData & changedAssets)673 int CloudSyncer::DownloadAssets(InnerProcessInfo &info, const std::vector<std::string> &pKColNames,
674 const std::set<Key> &dupHashKeySet, ChangedData &changedAssets)
675 {
676 if (!IsDataContainAssets()) {
677 return E_OK;
678 }
679 // update changed data info
680 if (!CloudSyncUtils::IsChangeDataEmpty(changedAssets)) {
681 // changedData.primaryData should have no member inside
682 return -E_INVALID_ARGS;
683 }
684 changedAssets.tableName = info.tableName;
685 changedAssets.type = ChangedDataType::ASSET;
686 changedAssets.field = pKColNames;
687
688 // Get AssetDownloadList
689 DownloadList changeList;
690 TaskId taskId;
691 {
692 std::lock_guard<std::mutex> autoLock(dataLock_);
693 changeList = currentContext_.assetDownloadList;
694 taskId = currentContext_.currentTaskId;
695 }
696 // Download data (include deleting) will handle return Code in this situation
697 int ret = CloudDbDownloadAssets(taskId, info, changeList, dupHashKeySet, changedAssets);
698 if (ret != E_OK) {
699 LOGE("[CloudSyncer] Can not download assets or can not handle download result %d", ret);
700 }
701 return ret;
702 }
703
GetAssetsFromVBucket(VBucket & data)704 std::map<std::string, Assets> CloudSyncer::GetAssetsFromVBucket(VBucket &data)
705 {
706 std::map<std::string, Assets> assets;
707 std::vector<Field> fields;
708 {
709 std::lock_guard<std::mutex> autoLock(dataLock_);
710 fields = currentContext_.assetFields[currentContext_.tableName];
711 }
712 for (const auto &field : fields) {
713 if (data.find(field.colName) != data.end()) {
714 if (field.type == TYPE_INDEX<Asset> && data[field.colName].index() == TYPE_INDEX<Asset>) {
715 assets[field.colName] = { std::get<Asset>(data[field.colName]) };
716 } else if (field.type == TYPE_INDEX<Assets> && data[field.colName].index() == TYPE_INDEX<Assets>) {
717 assets[field.colName] = std::get<Assets>(data[field.colName]);
718 } else {
719 Assets emptyAssets;
720 assets[field.colName] = emptyAssets;
721 }
722 }
723 }
724 return assets;
725 }
726
TagStatus(bool isExist,SyncParam & param,size_t idx,DataInfo & dataInfo,VBucket & localAssetInfo)727 int CloudSyncer::TagStatus(bool isExist, SyncParam ¶m, size_t idx, DataInfo &dataInfo, VBucket &localAssetInfo)
728 {
729 OpType strategyOpResult = OpType::NOT_HANDLE;
730 int errCode = TagStatusByStrategy(isExist, param, dataInfo, strategyOpResult);
731 if (errCode != E_OK) {
732 return errCode;
733 }
734 param.downloadData.opType[idx] = strategyOpResult;
735 if (!IsDataContainAssets()) {
736 return E_OK;
737 }
738 Key hashKey;
739 if (isExist) {
740 hashKey = dataInfo.localInfo.logInfo.hashKey;
741 }
742 return TagDownloadAssets(hashKey, idx, param, dataInfo, localAssetInfo);
743 }
744
TagDownloadAssets(const Key & hashKey,size_t idx,SyncParam & param,const DataInfo & dataInfo,VBucket & localAssetInfo)745 int CloudSyncer::TagDownloadAssets(const Key &hashKey, size_t idx, SyncParam ¶m, const DataInfo &dataInfo,
746 VBucket &localAssetInfo)
747 {
748 int ret = E_OK;
749 OpType strategy = param.downloadData.opType[idx];
750 switch (strategy) {
751 case OpType::INSERT:
752 case OpType::UPDATE:
753 case OpType::DELETE:
754 ret = HandleTagAssets(hashKey, dataInfo, idx, param, localAssetInfo);
755 break;
756 case OpType::NOT_HANDLE:
757 case OpType::ONLY_UPDATE_GID:
758 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO: { // means upload need this data
759 (void)TagAssetsInSingleRecord(localAssetInfo, param.downloadData.data[idx], true, ret);
760 for (const auto &[col, value]: localAssetInfo) {
761 param.downloadData.data[idx].insert_or_assign(col, value);
762 }
763 break;
764 }
765 default:
766 break;
767 }
768 return ret;
769 }
770
HandleTagAssets(const Key & hashKey,const DataInfo & dataInfo,size_t idx,SyncParam & param,VBucket & localAssetInfo)771 int CloudSyncer::HandleTagAssets(const Key &hashKey, const DataInfo &dataInfo, size_t idx, SyncParam ¶m,
772 VBucket &localAssetInfo)
773 {
774 Type prefix;
775 std::vector<Type> pkVals;
776 OpType strategy = param.downloadData.opType[idx];
777 bool isDelStrategy = (strategy == OpType::DELETE);
778 int ret = CloudSyncUtils::GetCloudPkVals(isDelStrategy ? dataInfo.localInfo.primaryKeys :
779 param.downloadData.data[idx], param.pkColNames, dataInfo.localInfo.logInfo.dataKey, pkVals);
780 if (ret != E_OK) {
781 LOGE("[CloudSyncer] HandleTagAssets cannot get primary key value list. %d", ret);
782 return ret;
783 }
784 prefix = param.isSinglePrimaryKey ? pkVals[0] : prefix;
785 if (param.isSinglePrimaryKey && prefix.index() == TYPE_INDEX<Nil>) {
786 LOGE("[CloudSyncer] Invalid primary key type in TagStatus, it's Nil.");
787 return -E_INTERNAL_ERROR;
788 }
789 AssetOperationUtils::FilterDeleteAsset(param.downloadData.data[idx]);
790 std::map<std::string, Assets> assetsMap = TagAssetsInSingleRecord(param.downloadData.data[idx], localAssetInfo,
791 false, ret);
792 if (ret != E_OK) {
793 LOGE("[CloudSyncer] TagAssetsInSingleRecord report ERROR in download data");
794 return ret;
795 }
796 strategy = CloudSyncUtils::CalOpType(param, idx);
797 if (!param.isSinglePrimaryKey && strategy == OpType::INSERT) {
798 param.withoutRowIdData.assetInsertData.push_back(std::make_tuple(idx, param.assetsDownloadList.size()));
799 }
800 param.assetsDownloadList.push_back(
801 std::make_tuple(dataInfo.cloudLogInfo.cloudGid, prefix, strategy, assetsMap, hashKey,
802 pkVals, dataInfo.cloudLogInfo.timestamp));
803 return ret;
804 }
805
SaveDatum(SyncParam & param,size_t idx,std::vector<std::pair<Key,size_t>> & deletedList,std::map<std::string,LogInfo> & localLogInfoCache)806 int CloudSyncer::SaveDatum(SyncParam ¶m, size_t idx, std::vector<std::pair<Key, size_t>> &deletedList,
807 std::map<std::string, LogInfo> &localLogInfoCache)
808 {
809 int ret = PreHandleData(param.downloadData.data[idx], param.pkColNames);
810 if (ret != E_OK) {
811 LOGE("[CloudSyncer] Invalid download data:%d", ret);
812 return ret;
813 }
814 CloudSyncUtils::ModifyCloudDataTime(param.downloadData.data[idx]);
815 DataInfo dataInfo;
816 VBucket localAssetInfo;
817 bool isExist = true;
818 ret = GetLocalInfo(idx, param, dataInfo.localInfo, localLogInfoCache, localAssetInfo);
819 if (ret == -E_NOT_FOUND) {
820 isExist = false;
821 } else if (ret != E_OK) {
822 LOGE("[CloudSyncer] Cannot get info by primary key or gid: %d.", ret);
823 return ret;
824 }
825 // Get cloudLogInfo from cloud data
826 dataInfo.cloudLogInfo = CloudSyncUtils::GetCloudLogInfo(param.downloadData.data[idx]);
827 // Tag datum to get opType
828 ret = TagStatus(isExist, param, idx, dataInfo, localAssetInfo);
829 if (ret != E_OK) {
830 LOGE("[CloudSyncer] Cannot tag status: %d.", ret);
831 return ret;
832 }
833 CloudSyncUtils::UpdateLocalCache(param.downloadData.opType[idx], dataInfo.cloudLogInfo, dataInfo.localInfo.logInfo,
834 localLogInfoCache);
835 ret = CloudSyncUtils::SaveChangedData(param, idx, dataInfo, deletedList);
836 if (ret != E_OK) {
837 LOGE("[CloudSyncer] Cannot save changed data: %d.", ret);
838 }
839 return ret;
840 }
841
SaveData(CloudSyncer::TaskId taskId,SyncParam & param)842 int CloudSyncer::SaveData(CloudSyncer::TaskId taskId, SyncParam ¶m)
843 {
844 if (!CloudSyncUtils::IsChangeDataEmpty(param.changedData)) {
845 LOGE("[CloudSyncer] changedData.primaryData should have no member inside.");
846 return -E_INVALID_ARGS;
847 }
848 // Update download batch Info
849 param.info.downLoadInfo.batchIndex += 1;
850 param.info.downLoadInfo.total += param.downloadData.data.size();
851 int ret = E_OK;
852 DownloadList assetsDownloadList;
853 param.assetsDownloadList = assetsDownloadList;
854 param.deletePrimaryKeySet.clear();
855 param.dupHashKeySet.clear();
856 CloudSyncUtils::ClearWithoutData(param);
857 std::vector<std::pair<Key, size_t>> deletedList;
858 // use for record local delete status
859 std::map<std::string, LogInfo> localLogInfoCache;
860 for (size_t i = 0; i < param.downloadData.data.size(); i++) {
861 ret = SaveDatum(param, i, deletedList, localLogInfoCache);
862 if (ret != E_OK) {
863 param.info.downLoadInfo.failCount += param.downloadData.data.size();
864 LOGE("[CloudSyncer] Cannot save datum due to error code %d", ret);
865 return ret;
866 }
867 }
868 // Save assetsMap into current context
869 {
870 std::lock_guard<std::mutex> autoLock(dataLock_);
871 currentContext_.assetDownloadList = param.assetsDownloadList;
872 }
873 // save the data to the database by batch, downloadData will return rowid when insert data.
874 ret = storageProxy_->PutCloudSyncData(param.tableName, param.downloadData);
875 if (ret != E_OK) {
876 param.info.downLoadInfo.failCount += param.downloadData.data.size();
877 LOGE("[CloudSyncer] Cannot save the data to database with error code: %d.", ret);
878 return ret;
879 }
880 ret = UpdateChangedData(param, currentContext_.assetDownloadList);
881 if (ret != E_OK) {
882 param.info.downLoadInfo.failCount += param.downloadData.data.size();
883 LOGE("[CloudSyncer] Cannot update changed data: %d.", ret);
884 return ret;
885 }
886 // Update downloadInfo
887 param.info.downLoadInfo.successCount += param.downloadData.data.size();
888 // Get latest cloudWaterMark
889 VBucket &lastData = param.downloadData.data.back();
890 if (!IsQueryListEmpty(taskId) && param.isLastBatch) {
891 // the last batch of cursor in the conditional query is useless
892 param.cloudWaterMark = {};
893 } else {
894 param.cloudWaterMark = std::get<std::string>(lastData[CloudDbConstant::CURSOR_FIELD]);
895 }
896 return UpdateFlagForSavedRecord(param);
897 }
898
PreCheck(CloudSyncer::TaskId & taskId,const TableName & tableName)899 int CloudSyncer::PreCheck(CloudSyncer::TaskId &taskId, const TableName &tableName)
900 {
901 // Check Input and Context Validity
902 int ret = CheckTaskIdValid(taskId);
903 if (ret != E_OK) {
904 return ret;
905 }
906 {
907 std::lock_guard<std::mutex> autoLock(dataLock_);
908 if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
909 LOGE("[CloudSyncer] Cloud Task Info does not exist taskId: , %" PRIu64 ".", taskId);
910 return -E_INVALID_ARGS;
911 }
912 }
913 if (currentContext_.strategy == nullptr) {
914 LOGE("[CloudSyncer] Strategy has not been initialized");
915 return -E_INVALID_ARGS;
916 }
917 ret = storageProxy_->CheckSchema(tableName);
918 if (ret != E_OK) {
919 LOGE("[CloudSyncer] A schema error occurred on the table to be synced, %d", ret);
920 return ret;
921 }
922 return E_OK;
923 }
924
NeedNotifyChangedData(const ChangedData & changedData)925 bool CloudSyncer::NeedNotifyChangedData(const ChangedData &changedData)
926 {
927 TaskId taskId;
928 {
929 std::lock_guard<std::mutex> autoLock(dataLock_);
930 taskId = currentContext_.currentTaskId;
931 }
932 if (IsModeForcePush(taskId)) {
933 return false;
934 }
935 // when there have no data been changed, it don't need fill back
936 if (changedData.primaryData[OP_INSERT].empty() && changedData.primaryData[OP_UPDATE].empty() &&
937 changedData.primaryData[OP_DELETE].empty()) {
938 return false;
939 }
940 return true;
941 }
942
NotifyChangedData(ChangedData && changedData)943 int CloudSyncer::NotifyChangedData(ChangedData &&changedData)
944 {
945 if (!NeedNotifyChangedData(changedData)) {
946 return E_OK;
947 }
948 std::string deviceName;
949 {
950 std::lock_guard<std::mutex> autoLock(dataLock_);
951 std::vector<std::string> devices = currentContext_.notifier->GetDevices();
952 if (devices.empty()) {
953 DBDfxAdapter::ReportBehavior(
954 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_NOTIFY, StageResult::FAIL, -E_CLOUD_ERROR});
955 LOGE("[CloudSyncer] CurrentContext do not contain device info");
956 return -E_CLOUD_ERROR;
957 }
958 // We use first device name as the target of NotifyChangedData
959 deviceName = devices[0];
960 }
961 int ret = storageProxy_->NotifyChangedData(deviceName, std::move(changedData));
962 if (ret != E_OK) {
963 DBDfxAdapter::ReportBehavior(
964 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_NOTIFY, StageResult::FAIL, ret});
965 LOGE("[CloudSyncer] Cannot notify changed data while downloading, %d.", ret);
966 }
967 DBDfxAdapter::ReportBehavior(
968 {__func__, Scene::CLOUD_SYNC, State::END, Stage::CLOUD_NOTIFY, StageResult::FAIL, ret});
969 return ret;
970 }
971
NotifyInDownload(CloudSyncer::TaskId taskId,SyncParam & param,bool isFirstDownload)972 void CloudSyncer::NotifyInDownload(CloudSyncer::TaskId taskId, SyncParam ¶m, bool isFirstDownload)
973 {
974 if (!isFirstDownload && param.downloadData.data.empty()) {
975 // if the second download and there is no download data, do not notify
976 return;
977 }
978 std::lock_guard<std::mutex> autoLock(dataLock_);
979 if (currentContext_.strategy->JudgeUpload()) {
980 currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], param.info);
981 } else {
982 if (param.isLastBatch) {
983 param.info.tableStatus = ProcessStatus::FINISHED;
984 }
985 if (cloudTaskInfos_[taskId].table.back() == param.tableName && param.isLastBatch) {
986 currentContext_.notifier->UpdateProcess(param.info);
987 } else {
988 currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], param.info);
989 }
990 }
991 }
992
SaveDataInTransaction(CloudSyncer::TaskId taskId,SyncParam & param)993 int CloudSyncer::SaveDataInTransaction(CloudSyncer::TaskId taskId, SyncParam ¶m)
994 {
995 int ret = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
996 if (ret != E_OK) {
997 LOGE("[CloudSyncer] Cannot start a transaction: %d.", ret);
998 return ret;
999 }
1000 if (!IsModeForcePush(taskId)) {
1001 param.changedData.tableName = param.info.tableName;
1002 param.changedData.field = param.pkColNames;
1003 param.changedData.type = ChangedDataType::DATA;
1004 }
1005 ret = SaveData(taskId, param);
1006 param.insertPk.clear();
1007 if (ret != E_OK) {
1008 LOGE("[CloudSyncer] cannot save data: %d.", ret);
1009 int rollBackErrorCode = storageProxy_->Rollback();
1010 if (rollBackErrorCode != E_OK) {
1011 LOGE("[CloudSyncer] cannot roll back transaction: %d.", rollBackErrorCode);
1012 } else {
1013 LOGI("[CloudSyncer] Roll back transaction success: %d.", ret);
1014 }
1015 return ret;
1016 }
1017 ret = storageProxy_->Commit();
1018 if (ret != E_OK) {
1019 LOGE("[CloudSyncer] Cannot commit a transaction: %d.", ret);
1020 }
1021 return ret;
1022 }
1023
DoDownloadAssets(bool skipSave,SyncParam & param)1024 int CloudSyncer::DoDownloadAssets(bool skipSave, SyncParam ¶m)
1025 {
1026 // Begin downloading assets
1027 ChangedData changedAssets;
1028 int ret = DownloadAssets(param.info, param.pkColNames, param.dupHashKeySet, changedAssets);
1029 bool isSharedTable = false;
1030 int errCode = storageProxy_->IsSharedTable(param.tableName, isSharedTable);
1031 if (errCode != E_OK) {
1032 LOGE("[CloudSyncer] HandleTagAssets cannot judge the table is a shared table. %d", errCode);
1033 return errCode;
1034 }
1035 if (!isSharedTable) {
1036 (void)NotifyChangedData(std::move(changedAssets));
1037 }
1038 if (ret == -E_TASK_PAUSED) {
1039 LOGD("[CloudSyncer] current task was paused, abort download asset");
1040 std::lock_guard<std::mutex> autoLock(dataLock_);
1041 resumeTaskInfos_[currentContext_.currentTaskId].skipQuery = true;
1042 return ret;
1043 } else if (skipSave) {
1044 std::lock_guard<std::mutex> autoLock(dataLock_);
1045 resumeTaskInfos_[currentContext_.currentTaskId].skipQuery = false;
1046 }
1047 if (ret != E_OK) {
1048 LOGE("[CloudSyncer] Cannot notify downloadAssets due to error %d", ret);
1049 }
1050 return ret;
1051 }
1052
SaveDataNotifyProcess(CloudSyncer::TaskId taskId,SyncParam & param)1053 int CloudSyncer::SaveDataNotifyProcess(CloudSyncer::TaskId taskId, SyncParam ¶m)
1054 {
1055 ChangedData changedData;
1056 bool skipSave = false;
1057 {
1058 bool currentTableResume = IsCurrentTableResume(taskId, false);
1059 std::lock_guard<std::mutex> autoLock(dataLock_);
1060 if (currentTableResume && resumeTaskInfos_[currentContext_.currentTaskId].skipQuery) {
1061 skipSave = true;
1062 }
1063 }
1064 int ret;
1065 if (!skipSave) {
1066 param.changedData = changedData;
1067 param.downloadData.opType.resize(param.downloadData.data.size());
1068 param.downloadData.existDataKey.resize(param.downloadData.data.size());
1069 param.downloadData.existDataHashKey.resize(param.downloadData.data.size());
1070 (void)storageProxy_->CreateTempSyncTrigger(param.tableName);
1071 ret = SaveDataInTransaction(taskId, param);
1072 (void)storageProxy_->ClearAllTempSyncTrigger();
1073 if (ret != E_OK) {
1074 return ret;
1075 }
1076 // call OnChange to notify changedData object first time (without Assets)
1077 ret = NotifyChangedData(std::move(param.changedData));
1078 if (ret != E_OK) {
1079 LOGE("[CloudSyncer] Cannot notify changed data due to error %d", ret);
1080 return ret;
1081 }
1082 }
1083 ret = DoDownloadAssets(skipSave, param);
1084 if (ret != E_OK) {
1085 return ret;
1086 }
1087 UpdateCloudWaterMark(taskId, param);
1088 return E_OK;
1089 }
1090
NotifyInBatchUpload(const UploadParam & uploadParam,const InnerProcessInfo & innerProcessInfo,bool lastBatch)1091 void CloudSyncer::NotifyInBatchUpload(const UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo,
1092 bool lastBatch)
1093 {
1094 CloudTaskInfo taskInfo;
1095 {
1096 std::lock_guard<std::mutex> autoLock(dataLock_);
1097 taskInfo = cloudTaskInfos_[uploadParam.taskId];
1098 }
1099 std::lock_guard<std::mutex> autoLock(dataLock_);
1100 if (uploadParam.lastTable && lastBatch) {
1101 currentContext_.notifier->UpdateProcess(innerProcessInfo);
1102 } else {
1103 currentContext_.notifier->NotifyProcess(taskInfo, innerProcessInfo);
1104 }
1105 }
1106
DoDownload(CloudSyncer::TaskId taskId,bool isFirstDownload)1107 int CloudSyncer::DoDownload(CloudSyncer::TaskId taskId, bool isFirstDownload)
1108 {
1109 SyncParam param;
1110 int errCode = GetSyncParamForDownload(taskId, param);
1111 if (errCode != E_OK) {
1112 LOGE("[CloudSyncer] get sync param for download failed %d", errCode);
1113 return errCode;
1114 }
1115 errCode = DoDownloadInner(taskId, param, isFirstDownload);
1116 if (errCode == -E_TASK_PAUSED) {
1117 // No need to handle ret.
1118 int ret = storageProxy_->GetCloudWaterMark(param.tableName, param.cloudWaterMark);
1119 if (ret != E_OK) {
1120 LOGE("[DoDownload] Cannot get cloud watermark : %d.", ret);
1121 }
1122 std::lock_guard<std::mutex> autoLock(dataLock_);
1123 resumeTaskInfos_[taskId].syncParam = std::move(param);
1124 }
1125 return errCode;
1126 }
1127
DoDownloadInner(CloudSyncer::TaskId taskId,SyncParam & param,bool isFirstDownload)1128 int CloudSyncer::DoDownloadInner(CloudSyncer::TaskId taskId, SyncParam ¶m, bool isFirstDownload)
1129 {
1130 // Query data by batch until reaching end and not more data need to be download
1131 int ret = PreCheck(taskId, param.info.tableName);
1132 if (ret != E_OK) {
1133 return ret;
1134 }
1135 do {
1136 ret = DownloadOneBatch(taskId, param, isFirstDownload);
1137 if (ret != E_OK) {
1138 return ret;
1139 }
1140 } while (!param.isLastBatch);
1141 return E_OK;
1142 }
1143
NotifyInEmptyDownload(CloudSyncer::TaskId taskId,InnerProcessInfo & info)1144 void CloudSyncer::NotifyInEmptyDownload(CloudSyncer::TaskId taskId, InnerProcessInfo &info)
1145 {
1146 std::lock_guard<std::mutex> autoLock(dataLock_);
1147 if (currentContext_.strategy->JudgeUpload()) {
1148 currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], info);
1149 } else {
1150 info.tableStatus = FINISHED;
1151 if (cloudTaskInfos_[taskId].table.back() == info.tableName) {
1152 currentContext_.notifier->UpdateProcess(info);
1153 } else {
1154 currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], info);
1155 }
1156 }
1157 }
1158
PreCheckUpload(CloudSyncer::TaskId & taskId,const TableName & tableName,Timestamp & localMark)1159 int CloudSyncer::PreCheckUpload(CloudSyncer::TaskId &taskId, const TableName &tableName, Timestamp &localMark)
1160 {
1161 int ret = PreCheck(taskId, tableName);
1162 if (ret != E_OK) {
1163 return ret;
1164 }
1165 {
1166 std::lock_guard<std::mutex> autoLock(dataLock_);
1167 if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1168 LOGE("[CloudSyncer] Cloud Task Info does not exist taskId: %" PRIu64 ".", taskId);
1169 return -E_INVALID_ARGS;
1170 }
1171 if ((cloudTaskInfos_[taskId].mode < SYNC_MODE_CLOUD_MERGE) ||
1172 (cloudTaskInfos_[taskId].mode > SYNC_MODE_CLOUD_FORCE_PUSH)) {
1173 LOGE("[CloudSyncer] Upload failed, invalid sync mode: %d.",
1174 static_cast<int>(cloudTaskInfos_[taskId].mode));
1175 return -E_INVALID_ARGS;
1176 }
1177 }
1178
1179 return ret;
1180 }
1181
SaveUploadData(Info & insertInfo,Info & updateInfo,Info & deleteInfo,CloudSyncData & uploadData,InnerProcessInfo & innerProcessInfo)1182 int CloudSyncer::SaveUploadData(Info &insertInfo, Info &updateInfo, Info &deleteInfo, CloudSyncData &uploadData,
1183 InnerProcessInfo &innerProcessInfo)
1184 {
1185 int errCode = E_OK;
1186 if (!uploadData.delData.record.empty() && !uploadData.delData.extend.empty()) {
1187 errCode = BatchDelete(deleteInfo, uploadData, innerProcessInfo);
1188 if (errCode != E_OK) {
1189 return errCode;
1190 }
1191 }
1192
1193 if (!uploadData.updData.record.empty() && !uploadData.updData.extend.empty()) {
1194 errCode = BatchUpdate(updateInfo, uploadData, innerProcessInfo);
1195 if (errCode != E_OK) {
1196 return errCode;
1197 }
1198 }
1199
1200 if (!uploadData.insData.record.empty() && !uploadData.insData.extend.empty()) {
1201 errCode = BatchInsert(insertInfo, uploadData, innerProcessInfo);
1202 if (errCode != E_OK) {
1203 return errCode;
1204 }
1205 }
1206
1207 if (!uploadData.lockData.rowid.empty()) {
1208 errCode = storageProxy_->FillCloudLogAndAsset(OpType::LOCKED_NOT_HANDLE, uploadData);
1209 }
1210 return errCode;
1211 }
1212
DoBatchUpload(CloudSyncData & uploadData,UploadParam & uploadParam,InnerProcessInfo & innerProcessInfo)1213 int CloudSyncer::DoBatchUpload(CloudSyncData &uploadData, UploadParam &uploadParam, InnerProcessInfo &innerProcessInfo)
1214 {
1215 int errCode = storageProxy_->FillCloudLogAndAsset(OpType::SET_UPLOADING, uploadData);
1216 if (errCode != E_OK) {
1217 return errCode;
1218 }
1219 Info insertInfo;
1220 Info updateInfo;
1221 Info deleteInfo;
1222 errCode = SaveUploadData(insertInfo, updateInfo, deleteInfo, uploadData, innerProcessInfo);
1223 if (errCode != E_OK) {
1224 return errCode;
1225 }
1226 bool lastBatch = innerProcessInfo.upLoadInfo.successCount == innerProcessInfo.upLoadInfo.total;
1227 if (lastBatch) {
1228 innerProcessInfo.tableStatus = ProcessStatus::FINISHED;
1229 }
1230 // After each batch upload successed, call NotifyProcess
1231 NotifyInBatchUpload(uploadParam, innerProcessInfo, lastBatch);
1232
1233 // if batch upload successed, update local water mark
1234 // The cloud water mark cannot be updated here, because the cloud api doesn't return cursor here.
1235 errCode = PutWaterMarkAfterBatchUpload(uploadData.tableName, uploadParam);
1236 if (errCode != E_OK) {
1237 LOGE("[CloudSyncer] Failed to set local water mark when doing upload, %d.", errCode);
1238 }
1239 return errCode;
1240 }
1241
PutWaterMarkAfterBatchUpload(const std::string & tableName,UploadParam & uploadParam)1242 int CloudSyncer::PutWaterMarkAfterBatchUpload(const std::string &tableName, UploadParam &uploadParam)
1243 {
1244 int errCode = E_OK;
1245 storageProxy_->ReleaseUploadRecord(tableName, uploadParam.mode, uploadParam.localMark);
1246 // if we use local cover cloud strategy, it won't update local water mark also.
1247 if (IsModeForcePush(uploadParam.taskId) || (IsPriorityTask(uploadParam.taskId) &&
1248 !IsQueryListEmpty(uploadParam.taskId))) {
1249 return E_OK;
1250 }
1251 errCode = storageProxy_->PutWaterMarkByMode(tableName, uploadParam.mode, uploadParam.localMark);
1252 if (errCode != E_OK) {
1253 LOGE("[CloudSyncer] Cannot set local water mark while Uploading, %d.", errCode);
1254 }
1255 return errCode;
1256 }
1257
DoUpload(CloudSyncer::TaskId taskId,bool lastTable,LockAction lockAction)1258 int CloudSyncer::DoUpload(CloudSyncer::TaskId taskId, bool lastTable, LockAction lockAction)
1259 {
1260 std::string tableName;
1261 int ret = GetCurrentTableName(tableName);
1262 if (ret != E_OK) {
1263 LOGE("[CloudSyncer] Invalid table name for syncing: %d", ret);
1264 return ret;
1265 }
1266
1267 Timestamp localMark = 0u;
1268 ret = PreCheckUpload(taskId, tableName, localMark);
1269 if (ret != E_OK) {
1270 LOGE("[CloudSyncer] Doing upload sync pre check failed, %d.", ret);
1271 return ret;
1272 }
1273 ReloadWaterMarkIfNeed(taskId, localMark);
1274 storageProxy_->OnUploadStart();
1275
1276 int64_t count = 0;
1277 ret = storageProxy_->GetUploadCount(GetQuerySyncObject(tableName), IsModeForcePush(taskId),
1278 IsCompensatedTask(taskId), IsNeedGetLocalWater(taskId), count);
1279 LOGI("get upload count:%zu", count);
1280 if (ret != E_OK) {
1281 // GetUploadCount will return E_OK when upload count is zero.
1282 LOGE("[CloudSyncer] Failed to get Upload Data Count, %d.", ret);
1283 return ret;
1284 }
1285 if (count == 0) {
1286 UpdateProcessInfoWithoutUpload(taskId, tableName, !lastTable);
1287 return E_OK;
1288 }
1289 UploadParam param;
1290 param.count = count;
1291 param.lastTable = lastTable;
1292 param.taskId = taskId;
1293 param.lockAction = lockAction;
1294 return DoUploadInner(tableName, param);
1295 }
1296
PreProcessBatchUpload(UploadParam & uploadParam,const InnerProcessInfo & innerProcessInfo,CloudSyncData & uploadData)1297 int CloudSyncer::PreProcessBatchUpload(UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo,
1298 CloudSyncData &uploadData)
1299 {
1300 // Precheck and calculate local water mark which would be updated if batch upload successed.
1301 int ret = CheckTaskIdValid(uploadParam.taskId);
1302 if (ret != E_OK) {
1303 return ret;
1304 }
1305 ret = CloudSyncUtils::CheckCloudSyncDataValid(uploadData, innerProcessInfo.tableName,
1306 innerProcessInfo.upLoadInfo.total);
1307 if (ret != E_OK) {
1308 LOGE("[CloudSyncer] Invalid Cloud Sync Data of Upload, %d.", ret);
1309 return ret;
1310 }
1311 TagUploadAssets(uploadData);
1312 CloudSyncUtils::UpdateAssetsFlag(uploadData);
1313 // get local water mark to be updated in future.
1314 ret = CloudSyncUtils::UpdateExtendTime(uploadData, innerProcessInfo.upLoadInfo.total,
1315 uploadParam.taskId, uploadParam.localMark);
1316 if (ret != E_OK) {
1317 LOGE("[CloudSyncer] Failed to get new local water mark in Cloud Sync Data, %d.", ret);
1318 }
1319 return ret;
1320 }
1321
SaveCloudWaterMark(const TableName & tableName,const TaskId taskId)1322 int CloudSyncer::SaveCloudWaterMark(const TableName &tableName, const TaskId taskId)
1323 {
1324 std::string cloudWaterMark;
1325 bool isUpdateCloudCursor = true;
1326 {
1327 std::lock_guard<std::mutex> autoLock(dataLock_);
1328 if (currentContext_.cloudWaterMarks.find(currentContext_.currentUserIndex) ==
1329 currentContext_.cloudWaterMarks.end() ||
1330 currentContext_.cloudWaterMarks[currentContext_.currentUserIndex].find(tableName) ==
1331 currentContext_.cloudWaterMarks[currentContext_.currentUserIndex].end()) {
1332 LOGD("[CloudSyncer] Not found water mark just return");
1333 return E_OK;
1334 }
1335 cloudWaterMark = currentContext_.cloudWaterMarks[currentContext_.currentUserIndex][tableName];
1336 isUpdateCloudCursor = currentContext_.strategy->JudgeUpdateCursor();
1337 }
1338 isUpdateCloudCursor = isUpdateCloudCursor && !(IsPriorityTask(taskId) && !IsQueryListEmpty(taskId));
1339 if (isUpdateCloudCursor) {
1340 int errCode = storageProxy_->SetCloudWaterMark(tableName, cloudWaterMark);
1341 if (errCode != E_OK) {
1342 LOGE("[CloudSyncer] Cannot set cloud water mark, %d.", errCode);
1343 }
1344 return errCode;
1345 }
1346 return E_OK;
1347 }
1348
SetUploadDataFlag(const TaskId taskId,CloudSyncData & uploadData)1349 void CloudSyncer::SetUploadDataFlag(const TaskId taskId, CloudSyncData& uploadData)
1350 {
1351 std::lock_guard<std::mutex> autoLock(dataLock_);
1352 uploadData.isCloudForcePushStrategy = (cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PUSH);
1353 uploadData.isCompensatedTask = cloudTaskInfos_[taskId].compensatedTask;
1354 }
1355
IsModeForcePush(TaskId taskId)1356 bool CloudSyncer::IsModeForcePush(TaskId taskId)
1357 {
1358 std::lock_guard<std::mutex> autoLock(dataLock_);
1359 return cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PUSH;
1360 }
1361
IsModeForcePull(const TaskId taskId)1362 bool CloudSyncer::IsModeForcePull(const TaskId taskId)
1363 {
1364 std::lock_guard<std::mutex> autoLock(dataLock_);
1365 return cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PULL;
1366 }
1367
IsPriorityTask(TaskId taskId)1368 bool CloudSyncer::IsPriorityTask(TaskId taskId)
1369 {
1370 std::lock_guard<std::mutex> autoLock(dataLock_);
1371 return cloudTaskInfos_[taskId].priorityTask;
1372 }
1373
PreHandleData(VBucket & datum,const std::vector<std::string> & pkColNames)1374 int CloudSyncer::PreHandleData(VBucket &datum, const std::vector<std::string> &pkColNames)
1375 {
1376 // type index of field in fields, true means mandatory filed
1377 static std::vector<std::tuple<std::string, int32_t, bool>> fieldAndIndex = {
1378 std::make_tuple(CloudDbConstant::GID_FIELD, TYPE_INDEX<std::string>, true),
1379 std::make_tuple(CloudDbConstant::CREATE_FIELD, TYPE_INDEX<int64_t>, true),
1380 std::make_tuple(CloudDbConstant::MODIFY_FIELD, TYPE_INDEX<int64_t>, true),
1381 std::make_tuple(CloudDbConstant::DELETE_FIELD, TYPE_INDEX<bool>, true),
1382 std::make_tuple(CloudDbConstant::CURSOR_FIELD, TYPE_INDEX<std::string>, true),
1383 std::make_tuple(CloudDbConstant::SHARING_RESOURCE_FIELD, TYPE_INDEX<std::string>, false)
1384 };
1385
1386 for (const auto &fieldIndex : fieldAndIndex) {
1387 if (datum.find(std::get<0>(fieldIndex)) == datum.end()) {
1388 if (!std::get<2>(fieldIndex)) { // 2 is index of mandatory flag
1389 continue;
1390 }
1391 LOGE("[CloudSyncer] Cloud data do not contain expected field: %s.", std::get<0>(fieldIndex).c_str());
1392 return -E_CLOUD_ERROR;
1393 }
1394 if (datum[std::get<0>(fieldIndex)].index() != static_cast<size_t>(std::get<1>(fieldIndex))) {
1395 LOGE("[CloudSyncer] Cloud data's field: %s, doesn't has expected type.", std::get<0>(fieldIndex).c_str());
1396 return -E_CLOUD_ERROR;
1397 }
1398 }
1399
1400 if (std::get<bool>(datum[CloudDbConstant::DELETE_FIELD])) {
1401 CloudSyncUtils::RemoveDataExceptExtendInfo(datum, pkColNames);
1402 }
1403 std::lock_guard<std::mutex> autoLock(dataLock_);
1404 if (IsDataContainDuplicateAsset(currentContext_.assetFields[currentContext_.tableName], datum)) {
1405 LOGE("[CloudSyncer] Cloud data contain duplicate asset");
1406 return -E_CLOUD_ERROR;
1407 }
1408 return E_OK;
1409 }
1410
QueryCloudData(TaskId taskId,const std::string & tableName,std::string & cloudWaterMark,DownloadData & downloadData)1411 int CloudSyncer::QueryCloudData(TaskId taskId, const std::string &tableName, std::string &cloudWaterMark,
1412 DownloadData &downloadData)
1413 {
1414 VBucket extend;
1415 int ret = FillDownloadExtend(taskId, tableName, cloudWaterMark, extend);
1416 if (ret != E_OK) {
1417 return ret;
1418 }
1419 ret = cloudDB_.Query(tableName, extend, downloadData.data);
1420 if ((ret == E_OK || ret == -E_QUERY_END) && downloadData.data.empty()) {
1421 if (extend[CloudDbConstant::CURSOR_FIELD].index() != TYPE_INDEX<std::string>) {
1422 LOGE("[CloudSyncer] cursor type is not valid=%d", extend[CloudDbConstant::CURSOR_FIELD].index());
1423 return -E_CLOUD_ERROR;
1424 }
1425 cloudWaterMark = std::get<std::string>(extend[CloudDbConstant::CURSOR_FIELD]);
1426 LOGD("[CloudSyncer] Download data is empty, try to use other cursor=%s", cloudWaterMark.c_str());
1427 return ret;
1428 }
1429 if (ret == -E_QUERY_END) {
1430 LOGD("[CloudSyncer] Download data from cloud database success and no more data need to be downloaded");
1431 return -E_QUERY_END;
1432 }
1433 if (ret != E_OK) {
1434 LOGE("[CloudSyncer] Download data from cloud database unsuccess %d", ret);
1435 }
1436 return ret;
1437 }
1438
CheckTaskIdValid(TaskId taskId)1439 int CloudSyncer::CheckTaskIdValid(TaskId taskId)
1440 {
1441 if (closed_) {
1442 LOGE("[CloudSyncer] DB is closed.");
1443 return -E_DB_CLOSED;
1444 }
1445 std::lock_guard<std::mutex> autoLock(dataLock_);
1446 if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1447 LOGE("[CloudSyncer] not found task.");
1448 return -E_INVALID_ARGS;
1449 }
1450 if (cloudTaskInfos_[taskId].pause) {
1451 LOGW("[CloudSyncer] check task %" PRIu64 " was paused!", taskId);
1452 return -E_TASK_PAUSED;
1453 }
1454 if (cloudTaskInfos_[taskId].errCode != E_OK) {
1455 return cloudTaskInfos_[taskId].errCode;
1456 }
1457 return currentContext_.currentTaskId == taskId ? E_OK : -E_INVALID_ARGS;
1458 }
1459
GetCurrentTableName(std::string & tableName)1460 int CloudSyncer::GetCurrentTableName(std::string &tableName)
1461 {
1462 std::lock_guard<std::mutex> autoLock(dataLock_);
1463 if (currentContext_.tableName.empty()) {
1464 return -E_BUSY;
1465 }
1466 tableName = currentContext_.tableName;
1467 return E_OK;
1468 }
1469
CheckQueueSizeWithNoLock(bool priorityTask)1470 int CloudSyncer::CheckQueueSizeWithNoLock(bool priorityTask)
1471 {
1472 int32_t limit = queuedManualSyncLimit_;
1473 if (!priorityTask && taskQueue_.size() >= static_cast<size_t>(limit)) {
1474 LOGW("[CloudSyncer] too much sync task");
1475 return -E_BUSY;
1476 } else if (priorityTask && priorityTaskQueue_.size() >= static_cast<size_t>(limit)) {
1477 LOGW("[CloudSyncer] too much priority sync task");
1478 return -E_BUSY;
1479 }
1480 return E_OK;
1481 }
1482
PrepareSync(TaskId taskId)1483 int CloudSyncer::PrepareSync(TaskId taskId)
1484 {
1485 std::lock_guard<std::mutex> autoLock(dataLock_);
1486 if (closed_ || cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1487 LOGW("[CloudSyncer] Abort sync because syncer is closed");
1488 return -E_DB_CLOSED;
1489 }
1490 if (closed_ || currentContext_.currentTaskId != INVALID_TASK_ID) {
1491 LOGW("[CloudSyncer] Abort sync because syncer is closed or another task is running");
1492 return -E_DB_CLOSED;
1493 }
1494 currentContext_.currentTaskId = taskId;
1495 cloudTaskInfos_[taskId].resume = cloudTaskInfos_[taskId].pause;
1496 cloudTaskInfos_[taskId].pause = false;
1497 cloudTaskInfos_[taskId].status = ProcessStatus::PROCESSING;
1498 if (cloudTaskInfos_[taskId].resume) {
1499 auto tempLocker = currentContext_.locker;
1500 currentContext_ = resumeTaskInfos_[taskId].context;
1501 currentContext_.locker = tempLocker;
1502 } else {
1503 currentContext_.notifier = std::make_shared<ProcessNotifier>(this);
1504 currentContext_.strategy = StrategyFactory::BuildSyncStrategy(cloudTaskInfos_[taskId].mode, policy_);
1505 currentContext_.notifier->Init(cloudTaskInfos_[taskId].table, cloudTaskInfos_[taskId].devices,
1506 cloudTaskInfos_[taskId].users);
1507 currentContext_.processRecorder = std::make_shared<ProcessRecorder>();
1508 }
1509 LOGI("[CloudSyncer] exec storeId %.3s taskId %" PRIu64, cloudTaskInfos_[taskId].storeId.c_str(), taskId);
1510 return E_OK;
1511 }
1512
LockCloud(TaskId taskId)1513 int CloudSyncer::LockCloud(TaskId taskId)
1514 {
1515 int period;
1516 {
1517 auto res = cloudDB_.Lock();
1518 if (res.first != E_OK) {
1519 return res.first;
1520 }
1521 period = static_cast<int>(res.second) / HEARTBEAT_PERIOD;
1522 }
1523 int errCode = StartHeartBeatTimer(period, taskId);
1524 if (errCode != E_OK) {
1525 UnlockCloud();
1526 }
1527 return errCode;
1528 }
1529
UnlockCloud()1530 int CloudSyncer::UnlockCloud()
1531 {
1532 FinishHeartBeatTimer();
1533 return cloudDB_.UnLock();
1534 }
1535
StartHeartBeatTimer(int period,TaskId taskId)1536 int CloudSyncer::StartHeartBeatTimer(int period, TaskId taskId)
1537 {
1538 if (timerId_ != 0u) {
1539 LOGW("[CloudSyncer] HeartBeat timer has been start!");
1540 return E_OK;
1541 }
1542 TimerId timerId = 0;
1543 int errCode = RuntimeContext::GetInstance()->SetTimer(period, [this, taskId](TimerId timerId) {
1544 HeartBeat(timerId, taskId);
1545 return E_OK;
1546 }, nullptr, timerId);
1547 if (errCode != E_OK) {
1548 LOGE("[CloudSyncer] HeartBeat timer start failed %d", errCode);
1549 return errCode;
1550 }
1551 timerId_ = timerId;
1552 return E_OK;
1553 }
1554
FinishHeartBeatTimer()1555 void CloudSyncer::FinishHeartBeatTimer()
1556 {
1557 if (timerId_ == 0u) {
1558 return;
1559 }
1560 RuntimeContext::GetInstance()->RemoveTimer(timerId_, true);
1561 timerId_ = 0u;
1562 LOGD("[CloudSyncer] Finish heartbeat timer ok");
1563 }
1564
HeartBeat(TimerId timerId,TaskId taskId)1565 void CloudSyncer::HeartBeat(TimerId timerId, TaskId taskId)
1566 {
1567 if (timerId_ != timerId) {
1568 return;
1569 }
1570 IncObjRef(this);
1571 {
1572 std::lock_guard<std::mutex> autoLock(heartbeatMutex_);
1573 heartbeatCount_[taskId]++;
1574 }
1575 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, taskId]() {
1576 {
1577 std::lock_guard<std::mutex> guard(dataLock_);
1578 if (currentContext_.currentTaskId != taskId) {
1579 heartbeatCount_.erase(taskId);
1580 failedHeartbeatCount_.erase(taskId);
1581 DecObjRef(this);
1582 return;
1583 }
1584 }
1585 if (heartbeatCount_[taskId] >= HEARTBEAT_PERIOD) {
1586 // heartbeat block twice should finish task now
1587 SetTaskFailed(taskId, -E_CLOUD_ERROR);
1588 } else {
1589 int ret = cloudDB_.HeartBeat();
1590 if (ret != E_OK) {
1591 HeartBeatFailed(taskId, ret);
1592 } else {
1593 failedHeartbeatCount_[taskId] = 0;
1594 }
1595 }
1596 {
1597 std::lock_guard<std::mutex> autoLock(heartbeatMutex_);
1598 heartbeatCount_[taskId]--;
1599 if (currentContext_.currentTaskId != taskId) {
1600 heartbeatCount_.erase(taskId);
1601 failedHeartbeatCount_.erase(taskId);
1602 }
1603 }
1604 DecObjRef(this);
1605 });
1606 if (errCode != E_OK) {
1607 LOGW("[CloudSyncer] schedule heartbeat task failed %d", errCode);
1608 DecObjRef(this);
1609 }
1610 }
1611
HeartBeatFailed(TaskId taskId,int errCode)1612 void CloudSyncer::HeartBeatFailed(TaskId taskId, int errCode)
1613 {
1614 failedHeartbeatCount_[taskId]++;
1615 if (failedHeartbeatCount_[taskId] < MAX_HEARTBEAT_FAILED_LIMIT) {
1616 return;
1617 }
1618 LOGW("[CloudSyncer] heartbeat failed too much times!");
1619 FinishHeartBeatTimer();
1620 SetTaskFailed(taskId, errCode);
1621 }
1622
SetTaskFailed(TaskId taskId,int errCode)1623 void CloudSyncer::SetTaskFailed(TaskId taskId, int errCode)
1624 {
1625 std::lock_guard<std::mutex> autoLock(dataLock_);
1626 if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1627 return;
1628 }
1629 if (cloudTaskInfos_[taskId].errCode != E_OK) {
1630 return;
1631 }
1632 cloudTaskInfos_[taskId].errCode = errCode;
1633 }
1634
GetCloudSyncTaskCount()1635 int32_t CloudSyncer::GetCloudSyncTaskCount()
1636 {
1637 std::lock_guard<std::mutex> autoLock(dataLock_);
1638 return static_cast<int32_t>(taskQueue_.size() + priorityTaskQueue_.size());
1639 }
1640
CleanCloudData(ClearMode mode,const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema)1641 int CloudSyncer::CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList,
1642 const RelationalSchemaObject &localSchema)
1643 {
1644 std::lock_guard<std::mutex> lock(syncMutex_);
1645 int index = 1;
1646 for (const auto &tableName: tableNameList) {
1647 LOGD("[CloudSyncer] Start clean cloud water mark. table index: %d.", index);
1648 int ret = storageProxy_->CleanWaterMark(tableName);
1649 if (ret != E_OK) {
1650 LOGE("[CloudSyncer] failed to put cloud water mark after clean cloud data, %d.", ret);
1651 return ret;
1652 }
1653 index++;
1654 }
1655 int errCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
1656 if (errCode != E_OK) {
1657 LOGE("[CloudSyncer] failed to start Transaction before clean cloud data, %d", errCode);
1658 return errCode;
1659 }
1660
1661 std::vector<Asset> assets;
1662 errCode = storageProxy_->CleanCloudData(mode, tableNameList, localSchema, assets);
1663 if (errCode != E_OK) {
1664 LOGE("[CloudSyncer] failed to clean cloud data, %d.", errCode);
1665 storageProxy_->Rollback();
1666 return errCode;
1667 }
1668
1669 if (!assets.empty() && mode == FLAG_AND_DATA) {
1670 errCode = cloudDB_.RemoveLocalAssets(assets);
1671 if (errCode != E_OK) {
1672 LOGE("[Storage Executor] failed to remove local assets, %d.", errCode);
1673 storageProxy_->Rollback();
1674 return errCode;
1675 }
1676 }
1677
1678 storageProxy_->Commit();
1679 return errCode;
1680 }
1681
CleanWaterMarkInMemory(const std::set<std::string> & tableNameList)1682 int CloudSyncer::CleanWaterMarkInMemory(const std::set<std::string> &tableNameList)
1683 {
1684 std::lock_guard<std::mutex> lock(syncMutex_);
1685 for (const auto &tableName: tableNameList) {
1686 int ret = storageProxy_->CleanWaterMarkInMemory(tableName);
1687 if (ret != E_OK) {
1688 LOGE("[CloudSyncer] failed to clean cloud water mark in memory, %d.", ret);
1689 return ret;
1690 }
1691 }
1692 return E_OK;
1693 }
1694
UpdateCloudWaterMark(TaskId taskId,const SyncParam & param)1695 void CloudSyncer::UpdateCloudWaterMark(TaskId taskId, const SyncParam ¶m)
1696 {
1697 {
1698 std::lock_guard<std::mutex> autoLock(dataLock_);
1699 currentContext_.cloudWaterMarks[currentContext_.currentUserIndex][param.info.tableName] = param.cloudWaterMark;
1700 }
1701 }
1702
CommitDownloadResult(const DownloadItem & downloadItem,InnerProcessInfo & info,DownloadCommitList & commitList,int errCode)1703 int CloudSyncer::CommitDownloadResult(const DownloadItem &downloadItem, InnerProcessInfo &info,
1704 DownloadCommitList &commitList, int errCode)
1705 {
1706 if (commitList.empty()) {
1707 return E_OK;
1708 }
1709 uint32_t successCount = 0u;
1710 int ret = HandleDownloadResult(downloadItem, info.tableName, commitList, successCount);
1711 if (errCode == E_OK) {
1712 info.downLoadInfo.failCount += (commitList.size() - successCount);
1713 info.downLoadInfo.successCount -= (commitList.size() - successCount);
1714 }
1715 if (ret != E_OK) {
1716 LOGE("Commit download result failed.%d", ret);
1717 }
1718 commitList.clear();
1719 return ret;
1720 }
1721
GetIdentify() const1722 std::string CloudSyncer::GetIdentify() const
1723 {
1724 return id_;
1725 }
1726
TagStatusByStrategy(bool isExist,SyncParam & param,DataInfo & dataInfo,OpType & strategyOpResult)1727 int CloudSyncer::TagStatusByStrategy(bool isExist, SyncParam ¶m, DataInfo &dataInfo, OpType &strategyOpResult)
1728 {
1729 strategyOpResult = OpType::NOT_HANDLE;
1730 // ignore same record with local generate data
1731 if (dataInfo.localInfo.logInfo.device.empty() &&
1732 !CloudSyncUtils::NeedSaveData(dataInfo.localInfo.logInfo, dataInfo.cloudLogInfo)) {
1733 // not handle same data
1734 return E_OK;
1735 }
1736 {
1737 std::lock_guard<std::mutex> autoLock(dataLock_);
1738 if (!currentContext_.strategy) {
1739 LOGE("[CloudSyncer] strategy has not been set when tag status, %d.", -E_INTERNAL_ERROR);
1740 return -E_INTERNAL_ERROR;
1741 }
1742 bool isCloudWin = storageProxy_->IsTagCloudUpdateLocal(dataInfo.localInfo.logInfo,
1743 dataInfo.cloudLogInfo, policy_);
1744 strategyOpResult = currentContext_.strategy->TagSyncDataStatus(isExist, isCloudWin,
1745 dataInfo.localInfo.logInfo, dataInfo.cloudLogInfo);
1746 }
1747 if (strategyOpResult == OpType::DELETE) {
1748 param.deletePrimaryKeySet.insert(dataInfo.localInfo.logInfo.hashKey);
1749 }
1750 return E_OK;
1751 }
1752
GetLocalInfo(size_t index,SyncParam & param,DataInfoWithLog & logInfo,std::map<std::string,LogInfo> & localLogInfoCache,VBucket & localAssetInfo)1753 int CloudSyncer::GetLocalInfo(size_t index, SyncParam ¶m, DataInfoWithLog &logInfo,
1754 std::map<std::string, LogInfo> &localLogInfoCache, VBucket &localAssetInfo)
1755 {
1756 int errCode = storageProxy_->GetInfoByPrimaryKeyOrGid(param.tableName, param.downloadData.data[index],
1757 logInfo, localAssetInfo);
1758 if (errCode != E_OK && errCode != -E_NOT_FOUND) {
1759 return errCode;
1760 }
1761 std::string hashKey(logInfo.logInfo.hashKey.begin(), logInfo.logInfo.hashKey.end());
1762 if (hashKey.empty()) {
1763 return errCode;
1764 }
1765 param.downloadData.existDataKey[index] = logInfo.logInfo.dataKey;
1766 param.downloadData.existDataHashKey[index] = logInfo.logInfo.hashKey;
1767 if (localLogInfoCache.find(hashKey) != localLogInfoCache.end()) {
1768 LOGD("[CloudSyncer] exist same record in one batch, override from cache record! hash=%.3s",
1769 DBCommon::TransferStringToHex(hashKey).c_str());
1770 logInfo.logInfo.flag = localLogInfoCache[hashKey].flag;
1771 logInfo.logInfo.wTimestamp = localLogInfoCache[hashKey].wTimestamp;
1772 logInfo.logInfo.timestamp = localLogInfoCache[hashKey].timestamp;
1773 logInfo.logInfo.cloudGid = localLogInfoCache[hashKey].cloudGid;
1774 logInfo.logInfo.device = localLogInfoCache[hashKey].device;
1775 logInfo.logInfo.sharingResource = localLogInfoCache[hashKey].sharingResource;
1776 logInfo.logInfo.status = localLogInfoCache[hashKey].status;
1777 // delete record should remove local asset info
1778 if ((localLogInfoCache[hashKey].flag & DataItem::DELETE_FLAG) == DataItem::DELETE_FLAG) {
1779 localAssetInfo.clear();
1780 }
1781 errCode = E_OK;
1782 }
1783 logInfo.logInfo.isNeedUpdateAsset = IsNeedUpdateAsset(localAssetInfo);
1784 return errCode;
1785 }
1786
GetNextTaskId()1787 TaskId CloudSyncer::GetNextTaskId()
1788 {
1789 std::lock_guard<std::mutex> autoLock(dataLock_);
1790 if (!priorityTaskQueue_.empty()) {
1791 return priorityTaskQueue_.front();
1792 }
1793 if (!taskQueue_.empty()) {
1794 return taskQueue_.front();
1795 }
1796 return INVALID_TASK_ID;
1797 }
1798
MarkCurrentTaskPausedIfNeed()1799 void CloudSyncer::MarkCurrentTaskPausedIfNeed()
1800 {
1801 std::lock_guard<std::mutex> autoLock(dataLock_);
1802 if (currentContext_.currentTaskId == INVALID_TASK_ID) {
1803 return;
1804 }
1805 if (cloudTaskInfos_.find(currentContext_.currentTaskId) == cloudTaskInfos_.end()) {
1806 return;
1807 }
1808 if (!cloudTaskInfos_[currentContext_.currentTaskId].priorityTask) {
1809 cloudTaskInfos_[currentContext_.currentTaskId].pause = true;
1810 LOGD("[CloudSyncer] Mark taskId %" PRIu64 " paused success", currentContext_.currentTaskId);
1811 }
1812 }
1813
SetCurrentTaskFailedWithoutLock(int errCode)1814 void CloudSyncer::SetCurrentTaskFailedWithoutLock(int errCode)
1815 {
1816 if (currentContext_.currentTaskId == INVALID_TASK_ID) {
1817 return;
1818 }
1819 cloudTaskInfos_[currentContext_.currentTaskId].errCode = errCode;
1820 }
1821
LockCloudIfNeed(TaskId taskId)1822 int CloudSyncer::LockCloudIfNeed(TaskId taskId)
1823 {
1824 {
1825 std::lock_guard<std::mutex> autoLock(dataLock_);
1826 if (currentContext_.locker != nullptr) {
1827 LOGD("[CloudSyncer] lock exist");
1828 return E_OK;
1829 }
1830 }
1831 std::shared_ptr<CloudLocker> locker = nullptr;
1832 int errCode = CloudLocker::BuildCloudLock([taskId, this]() {
1833 return LockCloud(taskId);
1834 }, [this]() {
1835 int unlockCode = UnlockCloud();
1836 if (unlockCode != E_OK) {
1837 SetCurrentTaskFailedWithoutLock(unlockCode);
1838 }
1839 }, locker);
1840 {
1841 std::lock_guard<std::mutex> autoLock(dataLock_);
1842 currentContext_.locker = locker;
1843 }
1844 return errCode;
1845 }
1846
UnlockIfNeed()1847 void CloudSyncer::UnlockIfNeed()
1848 {
1849 std::shared_ptr<CloudLocker> cacheLocker;
1850 {
1851 std::lock_guard<std::mutex> autoLock(dataLock_);
1852 if (currentContext_.locker == nullptr) {
1853 LOGW("[CloudSyncer] locker is nullptr when unlock it"); // should not happen
1854 }
1855 cacheLocker = currentContext_.locker;
1856 currentContext_.locker = nullptr;
1857 }
1858 // unlock without mutex
1859 cacheLocker = nullptr;
1860 }
1861
ClearCurrentContextWithoutLock()1862 void CloudSyncer::ClearCurrentContextWithoutLock()
1863 {
1864 heartbeatCount_.erase(currentContext_.currentTaskId);
1865 failedHeartbeatCount_.erase(currentContext_.currentTaskId);
1866 currentContext_.currentTaskId = INVALID_TASK_ID;
1867 currentContext_.notifier = nullptr;
1868 currentContext_.strategy = nullptr;
1869 currentContext_.processRecorder = nullptr;
1870 currentContext_.tableName.clear();
1871 currentContext_.assetDownloadList.clear();
1872 currentContext_.assetFields.clear();
1873 currentContext_.assetsInfo.clear();
1874 currentContext_.cloudWaterMarks.clear();
1875 currentContext_.isNeedUpload = false;
1876 currentContext_.currentUserIndex = 0;
1877 currentContext_.repeatCount = 0;
1878 }
1879
ClearContextAndNotify(TaskId taskId,int errCode)1880 void CloudSyncer::ClearContextAndNotify(TaskId taskId, int errCode)
1881 {
1882 std::shared_ptr<ProcessNotifier> notifier = nullptr;
1883 CloudTaskInfo info;
1884 {
1885 // clear current context
1886 std::lock_guard<std::mutex> autoLock(dataLock_);
1887 notifier = currentContext_.notifier;
1888 ClearCurrentContextWithoutLock();
1889 if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) { // should not happen
1890 LOGW("[CloudSyncer] taskId %" PRIu64 " has been finished!", taskId);
1891 contextCv_.notify_one();
1892 return;
1893 }
1894 info = std::move(cloudTaskInfos_[taskId]);
1895 cloudTaskInfos_.erase(taskId);
1896 resumeTaskInfos_.erase(taskId);
1897 }
1898 contextCv_.notify_one();
1899 if (info.errCode == E_OK) {
1900 info.errCode = errCode;
1901 }
1902 LOGI("[CloudSyncer] finished storeId %.3s taskId %" PRIu64 " errCode %d", info.storeId.c_str(), taskId,
1903 info.errCode);
1904 info.status = ProcessStatus::FINISHED;
1905 if (notifier != nullptr) {
1906 notifier->NotifyProcess(info, {}, true);
1907 }
1908 // generate compensated sync
1909 if (!info.compensatedTask) {
1910 CloudTaskInfo taskInfo = CloudSyncUtils::InitCompensatedSyncTaskInfo(info);
1911 GenerateCompensatedSync(taskInfo);
1912 }
1913 }
1914
DownloadOneBatch(TaskId taskId,SyncParam & param,bool isFirstDownload)1915 int CloudSyncer::DownloadOneBatch(TaskId taskId, SyncParam ¶m, bool isFirstDownload)
1916 {
1917 int ret = CheckTaskIdValid(taskId);
1918 if (ret != E_OK) {
1919 return ret;
1920 }
1921 bool abort = false;
1922 ret = DownloadDataFromCloud(taskId, param, abort, isFirstDownload);
1923 if (abort) {
1924 return ret;
1925 }
1926 // Save data in transaction, update cloud water mark, notify process and changed data
1927 ret = SaveDataNotifyProcess(taskId, param);
1928 if (ret == -E_TASK_PAUSED) {
1929 return ret;
1930 }
1931 if (ret != E_OK) {
1932 std::lock_guard<std::mutex> autoLock(dataLock_);
1933 param.info.tableStatus = ProcessStatus::FINISHED;
1934 currentContext_.notifier->UpdateProcess(param.info);
1935 return ret;
1936 }
1937 (void)NotifyInDownload(taskId, param, isFirstDownload);
1938 return SaveCloudWaterMark(param.tableName, taskId);
1939 }
1940
DownloadOneAssetRecord(const std::set<Key> & dupHashKeySet,const DownloadList & downloadList,DownloadItem & downloadItem,InnerProcessInfo & info,ChangedData & changedAssets)1941 int CloudSyncer::DownloadOneAssetRecord(const std::set<Key> &dupHashKeySet, const DownloadList &downloadList,
1942 DownloadItem &downloadItem, InnerProcessInfo &info, ChangedData &changedAssets)
1943 {
1944 CloudStorageUtils::EraseNoChangeAsset(downloadItem.assets);
1945 if (downloadItem.assets.empty()) { // Download data (include deleting)
1946 return E_OK;
1947 }
1948 bool isSharedTable = false;
1949 int errorCode = storageProxy_->IsSharedTable(info.tableName, isSharedTable);
1950 if (errorCode != E_OK) {
1951 LOGE("[CloudSyncer] DownloadOneAssetRecord cannot judge the table is a shared table. %d", errorCode);
1952 return errorCode;
1953 }
1954 if (!isSharedTable) {
1955 errorCode = DownloadAssetsOneByOne(info, downloadItem, downloadItem.assets);
1956 if (errorCode == -E_NOT_SET) {
1957 return -E_NOT_SET;
1958 }
1959 } else {
1960 // share table will not download asset, need to reset the status
1961 for (auto &entry: downloadItem.assets) {
1962 for (auto &asset: entry.second) {
1963 asset.status = AssetStatus::NORMAL;
1964 }
1965 }
1966 }
1967 if (errorCode != E_OK) {
1968 info.downLoadInfo.failCount += 1;
1969 if (info.downLoadInfo.successCount == 0) {
1970 LOGW("[CloudSyncer] Invalid successCount");
1971 } else {
1972 info.downLoadInfo.successCount -= 1;
1973 }
1974 }
1975 if (!downloadItem.assets.empty()) {
1976 if (dupHashKeySet.find(downloadItem.hashKey) == dupHashKeySet.end()) {
1977 changedAssets.primaryData[CloudSyncUtils::OpTypeToChangeType(downloadItem.strategy)].push_back(
1978 downloadItem.primaryKeyValList);
1979 } else if (downloadItem.strategy == OpType::INSERT) {
1980 changedAssets.primaryData[ChangeType::OP_UPDATE].push_back(downloadItem.primaryKeyValList);
1981 }
1982 }
1983 // If the assets are DELETE, needn't fill back cloud assets.
1984 if (downloadItem.strategy == OpType::DELETE) {
1985 return E_OK;
1986 }
1987 return errorCode;
1988 }
1989
GetSyncParamForDownload(TaskId taskId,SyncParam & param)1990 int CloudSyncer::GetSyncParamForDownload(TaskId taskId, SyncParam ¶m)
1991 {
1992 int ret = E_OK;
1993 if (IsCurrentTableResume(taskId, false)) {
1994 std::lock_guard<std::mutex> autoLock(dataLock_);
1995 if (resumeTaskInfos_[taskId].syncParam.tableName == currentContext_.tableName) {
1996 param = resumeTaskInfos_[taskId].syncParam;
1997 resumeTaskInfos_[taskId].syncParam = {};
1998 ret = storageProxy_->GetCloudWaterMark(param.tableName, param.cloudWaterMark);
1999 if (ret != E_OK) {
2000 LOGE("[CloudSyncer] Cannot get cloud water level from cloud meta data when table is resume: %d.", ret);
2001 }
2002 LOGD("[CloudSyncer] Get sync param from cache");
2003 return E_OK;
2004 }
2005 }
2006 ret = GetCurrentTableName(param.tableName);
2007 if (ret != E_OK) {
2008 LOGE("[CloudSyncer] Invalid table name for syncing: %d", ret);
2009 return ret;
2010 }
2011 param.info.tableName = param.tableName;
2012 std::vector<Field> assetFields;
2013 // only no primary key and composite primary key contains rowid.
2014 ret = storageProxy_->GetPrimaryColNamesWithAssetsFields(param.tableName, param.pkColNames, assetFields);
2015 if (ret != E_OK) {
2016 LOGE("[CloudSyncer] Cannot get primary column names: %d", ret);
2017 return ret;
2018 }
2019 {
2020 std::lock_guard<std::mutex> autoLock(dataLock_);
2021 currentContext_.assetFields[currentContext_.tableName] = assetFields;
2022 }
2023 param.isSinglePrimaryKey = CloudSyncUtils::IsSinglePrimaryKey(param.pkColNames);
2024 if (!IsModeForcePull(taskId) && !(IsPriorityTask(taskId) && !IsQueryListEmpty(taskId))) {
2025 ret = storageProxy_->GetCloudWaterMark(param.tableName, param.cloudWaterMark);
2026 if (ret != E_OK) {
2027 LOGE("[CloudSyncer] Cannot get cloud water level from cloud meta data: %d.", ret);
2028 }
2029 }
2030 currentContext_.notifier->GetDownloadInfoByTableName(param.info);
2031 return ret;
2032 }
2033
IsCurrentTableResume(TaskId taskId,bool upload)2034 bool CloudSyncer::IsCurrentTableResume(TaskId taskId, bool upload)
2035 {
2036 std::lock_guard<std::mutex> autoLock(dataLock_);
2037 if (!cloudTaskInfos_[taskId].resume) {
2038 return false;
2039 }
2040 if (currentContext_.tableName != resumeTaskInfos_[taskId].context.tableName) {
2041 return false;
2042 }
2043 return upload == resumeTaskInfos_[taskId].upload;
2044 }
2045
DownloadDataFromCloud(TaskId taskId,SyncParam & param,bool & abort,bool isFirstDownload)2046 int CloudSyncer::DownloadDataFromCloud(TaskId taskId, SyncParam ¶m, bool &abort,
2047 bool isFirstDownload)
2048 {
2049 // Get cloud data after cloud water mark
2050 param.info.tableStatus = ProcessStatus::PROCESSING;
2051 param.downloadData = {};
2052 int ret = QueryCloudData(taskId, param.info.tableName, param.cloudWaterMark, param.downloadData);
2053 if (ret == -E_QUERY_END) {
2054 // Won't break here since downloadData may not be null
2055 param.isLastBatch = true;
2056 } else if (ret != E_OK) {
2057 std::lock_guard<std::mutex> autoLock(dataLock_);
2058 param.info.tableStatus = ProcessStatus::FINISHED;
2059 currentContext_.notifier->UpdateProcess(param.info);
2060 abort = true;
2061 return ret;
2062 }
2063 if (param.downloadData.data.empty()) {
2064 if (ret == E_OK || isFirstDownload) {
2065 LOGD("[CloudSyncer] try to query cloud data use increment water mark");
2066 UpdateCloudWaterMark(taskId, param);
2067 // Cloud water may change on the cloud, it needs to be saved here
2068 SaveCloudWaterMark(param.tableName, taskId);
2069 }
2070 if (isFirstDownload) {
2071 NotifyInEmptyDownload(taskId, param.info);
2072 }
2073 abort = true;
2074 }
2075 return E_OK;
2076 }
2077
GetDownloadAssetIndex(TaskId taskId)2078 size_t CloudSyncer::GetDownloadAssetIndex(TaskId taskId)
2079 {
2080 size_t index = 0u;
2081 std::lock_guard<std::mutex> autoLock(dataLock_);
2082 if (resumeTaskInfos_[taskId].lastDownloadIndex != 0u) {
2083 index = resumeTaskInfos_[taskId].lastDownloadIndex;
2084 resumeTaskInfos_[taskId].lastDownloadIndex = 0u;
2085 }
2086 return index;
2087 }
2088
GetCurrentTableUploadBatchIndex()2089 uint32_t CloudSyncer::GetCurrentTableUploadBatchIndex()
2090 {
2091 std::lock_guard<std::mutex> autoLock(dataLock_);
2092 return currentContext_.notifier->GetUploadBatchIndex(currentContext_.tableName);
2093 }
2094
ResetCurrentTableUploadBatchIndex()2095 void CloudSyncer::ResetCurrentTableUploadBatchIndex()
2096 {
2097 std::lock_guard<std::mutex> autoLock(dataLock_);
2098 currentContext_.notifier->ResetUploadBatchIndex(currentContext_.tableName);
2099 }
2100
RecordWaterMark(TaskId taskId,Timestamp waterMark)2101 void CloudSyncer::RecordWaterMark(TaskId taskId, Timestamp waterMark)
2102 {
2103 std::lock_guard<std::mutex> autoLock(dataLock_);
2104 resumeTaskInfos_[taskId].lastLocalWatermark = waterMark;
2105 }
2106
GetResumeWaterMark(TaskId taskId)2107 Timestamp CloudSyncer::GetResumeWaterMark(TaskId taskId)
2108 {
2109 std::lock_guard<std::mutex> autoLock(dataLock_);
2110 return resumeTaskInfos_[taskId].lastLocalWatermark;
2111 }
2112
GetInnerProcessInfo(const std::string & tableName,UploadParam & uploadParam)2113 CloudSyncer::InnerProcessInfo CloudSyncer::GetInnerProcessInfo(const std::string &tableName, UploadParam &uploadParam)
2114 {
2115 InnerProcessInfo info;
2116 info.tableName = tableName;
2117 info.tableStatus = ProcessStatus::PROCESSING;
2118 ReloadUploadInfoIfNeed(uploadParam.taskId, uploadParam, info);
2119 return info;
2120 }
2121
SetGenCloudVersionCallback(const GenerateCloudVersionCallback & callback)2122 void CloudSyncer::SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback)
2123 {
2124 cloudDB_.SetGenCloudVersionCallback(callback);
2125 }
2126
CopyAndClearTaskInfos()2127 std::vector<CloudSyncer::CloudTaskInfo> CloudSyncer::CopyAndClearTaskInfos()
2128 {
2129 std::vector<CloudTaskInfo> infoList;
2130 std::lock_guard<std::mutex> autoLock(dataLock_);
2131 for (const auto &item: cloudTaskInfos_) {
2132 infoList.push_back(item.second);
2133 }
2134 taskQueue_.clear();
2135 priorityTaskQueue_.clear();
2136 cloudTaskInfos_.clear();
2137 resumeTaskInfos_.clear();
2138 currentContext_.notifier = nullptr;
2139 return infoList;
2140 }
2141
WaitCurTaskFinished()2142 void CloudSyncer::WaitCurTaskFinished()
2143 {
2144 LOGD("[CloudSyncer] begin wait current task finished");
2145 std::unique_lock<std::mutex> uniqueLock(dataLock_);
2146 contextCv_.wait(uniqueLock, [this]() {
2147 return currentContext_.currentTaskId == INVALID_TASK_ID;
2148 });
2149 LOGD("[CloudSyncer] current task has been finished");
2150 }
2151 } // namespace DistributedDB
2152