1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "cloud_syncer.h"
16
17 #include <cstdint>
18 #include <utility>
19 #include <unordered_map>
20
21 #include "cloud/cloud_db_constant.h"
22 #include "cloud/cloud_storage_utils.h"
23 #include "cloud_sync_tag_assets.h"
24 #include "cloud_sync_utils.h"
25 #include "db_errno.h"
26 #include "cloud/icloud_db.h"
27 #include "kv_store_errno.h"
28 #include "log_print.h"
29 #include "platform_specific.h"
30 #include "runtime_context.h"
31 #include "strategy_factory.h"
32 #include "storage_proxy.h"
33 #include "store_types.h"
34
35 namespace DistributedDB {
36 namespace {
37 const TaskId INVALID_TASK_ID = 0u;
38 const int MAX_HEARTBEAT_FAILED_LIMIT = 2;
39 const int HEARTBEAT_PERIOD = 3;
40 const int MAX_DOWNLOAD_COMMIT_LIMIT = 5;
41 }
42
CloudSyncer(std::shared_ptr<StorageProxy> storageProxy)43 CloudSyncer::CloudSyncer(std::shared_ptr<StorageProxy> storageProxy)
44 : currentTaskId_(INVALID_TASK_ID),
45 storageProxy_(std::move(storageProxy)),
46 queuedManualSyncLimit_(DBConstant::QUEUED_SYNC_LIMIT_DEFAULT),
47 closed_(false),
48 timerId_(0u),
49 heartBeatCount_(0),
50 failedHeartBeatCount_(0),
51 syncCallbackCount_(0)
52 {
53 if (storageProxy_ != nullptr) {
54 id_ = storageProxy_->GetIdentify();
55 }
56 }
57
Sync(const std::vector<DeviceID> & devices,SyncMode mode,const std::vector<std::string> & tables,const SyncProcessCallback & callback,int64_t waitTime)58 int CloudSyncer::Sync(const std::vector<DeviceID> &devices, SyncMode mode,
59 const std::vector<std::string> &tables, const SyncProcessCallback &callback, int64_t waitTime)
60 {
61 int errCode = CheckParamValid(devices, mode);
62 if (errCode != E_OK) {
63 return errCode;
64 }
65 if (cloudDB_.IsNotExistCloudDB()) {
66 return -E_CLOUD_ERROR;
67 }
68 if (closed_) {
69 return -E_DB_CLOSED;
70 }
71 CloudTaskInfo taskInfo;
72 taskInfo.mode = mode;
73 taskInfo.table = tables;
74 taskInfo.callback = callback;
75 taskInfo.timeout = waitTime;
76 taskInfo.devices = devices;
77 errCode = TryToAddSyncTask(std::move(taskInfo));
78 if (errCode != E_OK) {
79 return errCode;
80 }
81 return TriggerSync();
82 }
83
SetCloudDB(const std::shared_ptr<ICloudDb> & cloudDB)84 void CloudSyncer::SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDB)
85 {
86 cloudDB_.SetCloudDB(cloudDB);
87 LOGI("[CloudSyncer] SetCloudDB finish");
88 }
89
SetIAssetLoader(const std::shared_ptr<IAssetLoader> & loader)90 void CloudSyncer::SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader)
91 {
92 cloudDB_.SetIAssetLoader(loader);
93 LOGI("[CloudSyncer] SetIAssetLoader finish");
94 }
95
Close()96 void CloudSyncer::Close()
97 {
98 closed_ = true;
99 CloudSyncer::TaskId currentTask;
100 {
101 std::lock_guard<std::mutex> autoLock(contextLock_);
102 currentTask = currentContext_.currentTaskId;
103 }
104 // mark current task db_closed
105 SetTaskFailed(currentTask, -E_DB_CLOSED);
106 cloudDB_.Close();
107 {
108 LOGD("[CloudSyncer] begin wait current task finished");
109 std::unique_lock<std::mutex> uniqueLock(contextLock_);
110 contextCv_.wait(uniqueLock, [this]() {
111 return currentContext_.currentTaskId == INVALID_TASK_ID;
112 });
113 LOGD("[CloudSyncer] current task has been finished");
114 }
115
116 // copy all task from queue
117 std::vector<CloudTaskInfo> infoList;
118 {
119 std::lock_guard<std::mutex> autoLock(queueLock_);
120 for (const auto &item: cloudTaskInfos_) {
121 infoList.push_back(item.second);
122 }
123 taskQueue_.clear();
124 cloudTaskInfos_.clear();
125 }
126 // notify all DB_CLOSED
127 for (auto &info: infoList) {
128 info.status = ProcessStatus::FINISHED;
129 info.errCode = -E_DB_CLOSED;
130 ProcessNotifier notifier(this);
131 notifier.Init(info.table, info.devices);
132 notifier.NotifyProcess(info, {}, true);
133 LOGI("[CloudSyncer] finished taskId %" PRIu64 " errCode %d", info.taskId, info.errCode);
134 }
135 storageProxy_->Close();
136 WaitAllSyncCallbackTaskFinish();
137 }
138
TriggerSync()139 int CloudSyncer::TriggerSync()
140 {
141 if (closed_) {
142 return -E_DB_CLOSED;
143 }
144 RefObject::IncObjRef(this);
145 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this]() {
146 DoSyncIfNeed();
147 RefObject::DecObjRef(this);
148 });
149 if (errCode != E_OK) {
150 LOGW("[CloudSyncer] schedule sync task failed %d", errCode);
151 RefObject::DecObjRef(this);
152 }
153 return errCode;
154 }
155
DoSyncIfNeed()156 void CloudSyncer::DoSyncIfNeed()
157 {
158 if (closed_) {
159 return;
160 }
161 // get taskId from queue
162 TaskId triggerTaskId;
163 {
164 std::lock_guard<std::mutex> autoLock(queueLock_);
165 if (taskQueue_.empty()) {
166 return;
167 }
168 triggerTaskId = taskQueue_.front();
169 }
170 // pop taskId in queue
171 if (PrepareSync(triggerTaskId) != E_OK) {
172 return;
173 }
174 // do sync logic
175 int errCode = DoSync(triggerTaskId);
176 // finished after sync
177 DoFinished(triggerTaskId, errCode, {});
178 // do next task async
179 (void)TriggerSync();
180 }
181
DoSync(TaskId taskId)182 int CloudSyncer::DoSync(TaskId taskId)
183 {
184 std::lock_guard<std::mutex> lock(syncMutex_);
185 CloudTaskInfo taskInfo;
186 {
187 std::lock_guard<std::mutex> autoLock(queueLock_);
188 taskInfo = cloudTaskInfos_[taskId];
189 }
190 int errCode = LockCloud(taskId);
191 if (errCode != E_OK) {
192 return errCode;
193 }
194 bool needUpload = true;
195 {
196 std::lock_guard<std::mutex> autoLock(contextLock_);
197 needUpload = currentContext_.strategy->JudgeUpload();
198 }
199 errCode = DoSyncInner(taskInfo, needUpload);
200 int unlockCode = UnlockCloud();
201 if (errCode == E_OK) {
202 errCode = unlockCode;
203 }
204 return errCode;
205 }
206
DoUploadInNeed(const CloudTaskInfo & taskInfo,const bool needUpload)207 int CloudSyncer::DoUploadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload)
208 {
209 if (!needUpload) {
210 return E_OK;
211 }
212 int errCode = storageProxy_->StartTransaction();
213 if (errCode != E_OK) {
214 LOGE("[CloudSyncer] start transaction Failed before doing upload.");
215 return errCode;
216 }
217 for (size_t i = 0; i < taskInfo.table.size(); ++i) {
218 LOGD("[CloudSyncer] try upload table, index: %zu", i);
219 errCode = CheckTaskIdValid(taskInfo.taskId);
220 if (errCode != E_OK) {
221 LOGE("[CloudSyncer] task is invalid, abort sync");
222 break;
223 }
224 {
225 std::lock_guard<std::mutex> autoLock(contextLock_);
226 currentContext_.tableName = taskInfo.table[i];
227 }
228 errCode = DoUpload(taskInfo.taskId, i == (taskInfo.table.size() - 1u));
229 if (errCode != E_OK) {
230 LOGE("[CloudSyncer] upload failed %d", errCode);
231 break;
232 }
233 errCode = SaveCloudWaterMark(taskInfo.table[i]);
234 if (errCode != E_OK) {
235 LOGE("[CloudSyncer] Can not save cloud water mark after uploading %d", errCode);
236 break;
237 }
238 }
239 if (errCode == E_OK) {
240 int commitErrorCode = storageProxy_->Commit();
241 if (commitErrorCode != E_OK) {
242 LOGE("[CloudSyncer] cannot commit transaction: %d.", commitErrorCode);
243 }
244 } else {
245 int rollBackErrorCode = storageProxy_->Rollback();
246 if (rollBackErrorCode != E_OK) {
247 LOGE("[CloudSyncer] cannot roll back transaction: %d.", rollBackErrorCode);
248 }
249 }
250 return errCode;
251 }
252
DoSyncInner(const CloudTaskInfo & taskInfo,const bool needUpload)253 int CloudSyncer::DoSyncInner(const CloudTaskInfo &taskInfo, const bool needUpload)
254 {
255 int errCode = E_OK;
256 for (size_t i = 0; i < taskInfo.table.size(); ++i) {
257 LOGD("[CloudSyncer] try download table, index: %zu", i);
258 errCode = CheckTaskIdValid(taskInfo.taskId);
259 if (errCode != E_OK) {
260 LOGE("[CloudSyncer] task is invalid, abort sync");
261 return errCode;
262 }
263 {
264 std::lock_guard<std::mutex> autoLock(contextLock_);
265 currentContext_.tableName = taskInfo.table[i];
266 }
267 errCode = DoDownload(taskInfo.taskId);
268 if (errCode != E_OK) {
269 LOGE("[CloudSyncer] download failed %d", errCode);
270 return errCode;
271 }
272 if (needUpload) {
273 continue;
274 }
275 errCode = SaveCloudWaterMark(taskInfo.table[i]);
276 if (errCode != E_OK) {
277 LOGE("[CloudSyncer] Can not save cloud water mark after downloading %d", errCode);
278 return errCode;
279 }
280 }
281
282 return DoUploadInNeed(taskInfo, needUpload);
283 }
284
DoFinished(TaskId taskId,int errCode,const InnerProcessInfo & processInfo)285 void CloudSyncer::DoFinished(TaskId taskId, int errCode, const InnerProcessInfo &processInfo)
286 {
287 {
288 std::lock_guard<std::mutex> autoLock(queueLock_);
289 taskQueue_.remove(taskId);
290 }
291 std::shared_ptr<ProcessNotifier> notifier = nullptr;
292 {
293 // check current task is running or not
294 std::lock_guard<std::mutex> autoLock(contextLock_);
295 if (currentContext_.currentTaskId != taskId) { // should not happen
296 LOGW("[CloudSyncer] taskId %" PRIu64 " not exist in context!", taskId);
297 return;
298 }
299 currentContext_.currentTaskId = INVALID_TASK_ID;
300 notifier = currentContext_.notifier;
301 currentContext_.notifier = nullptr;
302 currentContext_.strategy = nullptr;
303 currentContext_.tableName.clear();
304 currentContext_.assetDownloadList.completeDownloadList.clear();
305 currentContext_.assetDownloadList.downloadList.clear();
306 currentContext_.assetFields.clear();
307 currentContext_.assetsInfo.clear();
308 currentContext_.cloudWaterMarks.clear();
309 }
310 CloudTaskInfo info;
311 {
312 std::lock_guard<std::mutex> autoLock(queueLock_);
313 if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) { // should not happen
314 LOGW("[CloudSyncer] taskId %" PRIu64 " has been finished!", taskId);
315 contextCv_.notify_one();
316 return;
317 }
318 info = std::move(cloudTaskInfos_[taskId]);
319 cloudTaskInfos_.erase(taskId);
320 }
321 contextCv_.notify_one();
322 if (info.errCode == E_OK) {
323 info.errCode = errCode;
324 }
325 LOGI("[CloudSyncer] finished taskId %" PRIu64 " errCode %d", taskId, info.errCode);
326 info.status = ProcessStatus::FINISHED;
327 if (notifier != nullptr) {
328 notifier->NotifyProcess(info, processInfo, true);
329 }
330 }
331
SaveChangedDataByType(const VBucket & datum,ChangedData & changedData,const DataInfoWithLog & localInfo,ChangeType type)332 static int SaveChangedDataByType(const VBucket &datum, ChangedData &changedData, const DataInfoWithLog &localInfo,
333 ChangeType type)
334 {
335 int ret = E_OK;
336 std::vector<Type> cloudPkVals;
337 if (type == ChangeType::OP_DELETE) {
338 ret = GetCloudPkVals(localInfo.primaryKeys, changedData.field, localInfo.logInfo.dataKey, cloudPkVals);
339 } else {
340 ret = GetCloudPkVals(datum, changedData.field, localInfo.logInfo.dataKey, cloudPkVals);
341 }
342 if (ret != E_OK) {
343 return ret;
344 }
345 changedData.primaryData[type].emplace_back(std::move(cloudPkVals));
346 return E_OK;
347 }
348
FindDeletedListIndex(const std::vector<std::pair<Key,size_t>> & deletedList,const Key & hashKey,size_t & delIdx)349 int CloudSyncer::FindDeletedListIndex(const std::vector<std::pair<Key, size_t>> &deletedList, const Key &hashKey,
350 size_t &delIdx)
351 {
352 for (std::pair<Key, size_t> pair : deletedList) {
353 if (pair.first == hashKey) {
354 delIdx = pair.second;
355 return E_OK;
356 }
357 }
358 return -E_INTERNAL_ERROR;
359 }
360
SaveChangedData(SyncParam & param,int downIndex,const DataInfo & dataInfo,std::vector<std::pair<Key,size_t>> & deletedList)361 int CloudSyncer::SaveChangedData(SyncParam ¶m, int downIndex, const DataInfo &dataInfo,
362 std::vector<std::pair<Key, size_t>> &deletedList)
363 {
364 OpType opType = param.downloadData.opType[downIndex];
365 Key hashKey = dataInfo.localInfo.logInfo.hashKey;
366 if (param.deletePrimaryKeySet.find(hashKey) != param.deletePrimaryKeySet.end()) {
367 if (opType == OpType::INSERT) {
368 size_t delIdx;
369 int errCode = FindDeletedListIndex(deletedList, hashKey, delIdx);
370 if (errCode != E_OK) {
371 LOGE("[CloudSyncer] FindDeletedListIndex could not find delete item.");
372 return errCode;
373 }
374 param.changedData.primaryData[ChangeType::OP_DELETE].erase(
375 param.changedData.primaryData[ChangeType::OP_DELETE].begin() + delIdx);
376 (void)param.dupHashKeySet.insert(hashKey);
377 opType = OpType::UPDATE;
378 // only composite primary key needs to be processed.
379 if (!param.isSinglePrimaryKey) {
380 param.withoutRowIdData.updateData.push_back(std::make_tuple(downIndex,
381 param.changedData.primaryData[ChangeType::OP_UPDATE].size()));
382 }
383 } else if (opType == OpType::DELETE) {
384 std::pair<Key, size_t> pair{hashKey, static_cast<size_t>(
385 param.changedData.primaryData[ChangeType::OP_DELETE].size())};
386 deletedList.emplace_back(pair);
387 } else {
388 LOGW("[CloudSyncer] deletePrimaryKeySet ignore opType %d.", opType);
389 }
390 }
391 // INSERT: for no primary key or composite primary key situation
392 if (!param.isSinglePrimaryKey && opType == OpType::INSERT) {
393 param.withoutRowIdData.insertData.push_back(downIndex);
394 return E_OK;
395 }
396 switch (opType) {
397 // INSERT: only for single primary key situation
398 case OpType::INSERT:
399 return SaveChangedDataByType(
400 param.downloadData.data[downIndex], param.changedData, dataInfo.localInfo, ChangeType::OP_INSERT);
401 case OpType::UPDATE:
402 if (NeedSaveData(dataInfo.localInfo.logInfo, dataInfo.cloudLogInfo)) {
403 return SaveChangedDataByType(param.downloadData.data[downIndex], param.changedData,
404 dataInfo.localInfo, ChangeType::OP_UPDATE);
405 }
406 break;
407 case OpType::DELETE:
408 return SaveChangedDataByType(param.downloadData.data[downIndex], param.changedData,
409 dataInfo.localInfo, ChangeType::OP_DELETE);
410 default:
411 break;
412 }
413 return E_OK;
414 }
415
GetCloudLogInfo(VBucket & datum)416 static LogInfo GetCloudLogInfo(VBucket &datum)
417 {
418 LogInfo cloudLogInfo = { 0 };
419 cloudLogInfo.timestamp = (Timestamp)std::get<int64_t>(datum[CloudDbConstant::MODIFY_FIELD]);
420 cloudLogInfo.wTimestamp = (Timestamp)std::get<int64_t>(datum[CloudDbConstant::CREATE_FIELD]);
421 cloudLogInfo.flag = (std::get<bool>(datum[CloudDbConstant::DELETE_FIELD])) ? 1u : 0u;
422 cloudLogInfo.cloudGid = std::get<std::string>(datum[CloudDbConstant::GID_FIELD]);
423 return cloudLogInfo;
424 }
425
426 /**
427 * UpdateChangedData will be used for Insert case, which we can only get rowid after we saved data in db.
428 */
UpdateChangedData(SyncParam & param,AssetDownloadList & assetsDownloadList)429 int CloudSyncer::UpdateChangedData(SyncParam ¶m, AssetDownloadList &assetsDownloadList)
430 {
431 if (param.withoutRowIdData.insertData.empty() && param.withoutRowIdData.updateData.empty()) {
432 return E_OK;
433 }
434 int ret = E_OK;
435 for (size_t j : param.withoutRowIdData.insertData) {
436 VBucket &datum = param.downloadData.data[j];
437 std::vector<Type> primaryValues;
438 ret = GetCloudPkVals(datum, param.changedData.field,
439 std::get<int64_t>(datum[CloudDbConstant::ROW_ID_FIELD_NAME]), primaryValues);
440 if (ret != E_OK) {
441 LOGE("[CloudSyncer] updateChangedData cannot get primaryValues");
442 return ret;
443 }
444 param.changedData.primaryData[ChangeType::OP_INSERT].push_back(primaryValues);
445 }
446 for (const auto &tuple : param.withoutRowIdData.assetInsertData) {
447 size_t downloadIndex = std::get<0>(tuple);
448 VBucket &datum = param.downloadData.data[downloadIndex];
449 size_t insertIdx = std::get<1>(tuple);
450 std::vector<Type> &pkVal = std::get<5>(assetsDownloadList.downloadList[insertIdx]); // 5 means primary key list
451 pkVal[0] = datum[CloudDbConstant::ROW_ID_FIELD_NAME];
452 }
453 for (const auto &tuple : param.withoutRowIdData.updateData) {
454 size_t downloadIndex = std::get<0>(tuple);
455 size_t updateIndex = std::get<1>(tuple);
456 VBucket &datum = param.downloadData.data[downloadIndex];
457 size_t size = param.changedData.primaryData[ChangeType::OP_UPDATE].size();
458 if (updateIndex >= size) {
459 LOGE("[CloudSyncer] updateIndex is invalid. index=%zu, size=%zu", updateIndex, size);
460 return -E_INTERNAL_ERROR;
461 }
462 if (param.changedData.primaryData[ChangeType::OP_UPDATE][updateIndex].empty()) {
463 LOGE("[CloudSyncer] primary key value list should not be empty.");
464 return -E_INTERNAL_ERROR;
465 }
466 // no primary key or composite primary key, the first element is rowid
467 param.changedData.primaryData[ChangeType::OP_UPDATE][updateIndex][0] =
468 datum[CloudDbConstant::ROW_ID_FIELD_NAME];
469 }
470 return ret;
471 }
472
IsDataContainDuplicateAsset(const std::vector<Field> & assetFields,VBucket & data)473 bool CloudSyncer::IsDataContainDuplicateAsset(const std::vector<Field> &assetFields, VBucket &data)
474 {
475 for (const auto &assetField : assetFields) {
476 if (assetField.type == TYPE_INDEX<Assets> && data[assetField.colName].index() == TYPE_INDEX<Assets>) {
477 if (CloudStorageUtils::IsAssetsContainDuplicateAsset(std::get<Assets>(data[assetField.colName]))) {
478 return true;
479 }
480 }
481 }
482 return false;
483 }
484
IsDataContainAssets()485 bool CloudSyncer::IsDataContainAssets()
486 {
487 std::lock_guard<std::mutex> autoLock(contextLock_);
488 bool hasTable = (currentContext_.assetFields.find(currentContext_.tableName) != currentContext_.assetFields.end());
489 if (!hasTable) {
490 LOGW("[CloudSyncer] failed to get assetFields, because tableName doesn't exit in currentContext, %d.",
491 -E_INTERNAL_ERROR);
492 return false;
493 }
494 if (hasTable && currentContext_.assetFields[currentContext_.tableName].empty()) {
495 LOGI("[CloudSyncer] Current table do not contain assets, thereby we needn't download assets");
496 return false;
497 }
498 return true;
499 }
500
IncSyncCallbackTaskCount()501 void CloudSyncer::IncSyncCallbackTaskCount()
502 {
503 std::lock_guard<std::mutex> autoLock(syncCallbackMutex_);
504 syncCallbackCount_++;
505 }
506
DecSyncCallbackTaskCount()507 void CloudSyncer::DecSyncCallbackTaskCount()
508 {
509 {
510 std::lock_guard<std::mutex> autoLock(syncCallbackMutex_);
511 syncCallbackCount_--;
512 }
513 syncCallbackCv_.notify_all();
514 }
515
WaitAllSyncCallbackTaskFinish()516 void CloudSyncer::WaitAllSyncCallbackTaskFinish()
517 {
518 std::unique_lock<std::mutex> uniqueLock(syncCallbackMutex_);
519 LOGD("[CloudSyncer] Begin wait all callback task finish");
520 syncCallbackCv_.wait(uniqueLock, [this]() {
521 return syncCallbackCount_ <= 0;
522 });
523 LOGD("[CloudSyncer] End wait all callback task finish");
524 }
525
TagAssetsInSingleRecord(VBucket & coveredData,VBucket & beCoveredData,bool setNormalStatus,int & errCode)526 std::map<std::string, Assets> CloudSyncer::TagAssetsInSingleRecord(VBucket &coveredData, VBucket &beCoveredData,
527 bool setNormalStatus, int &errCode)
528 {
529 // Define a map to store the result
530 std::map<std::string, Assets> res = {};
531 std::vector<Field> assetFields;
532 {
533 std::lock_guard<std::mutex> autoLock(contextLock_);
534 assetFields = currentContext_.assetFields[currentContext_.tableName];
535 }
536 // For every column contain asset or assets, assetFields are in context
537 for (const Field &assetField : assetFields) {
538 Assets assets = TagAssetsInSingleCol(coveredData, beCoveredData, assetField, setNormalStatus, errCode);
539 if (!assets.empty()) {
540 res[assetField.colName] = assets;
541 }
542 if (errCode != E_OK) {
543 break;
544 }
545 }
546 return res;
547 }
548
FillCloudAssets(const std::string & tableName,VBucket & normalAssets,VBucket & failedAssets)549 int CloudSyncer::FillCloudAssets(
550 const std::string &tableName, VBucket &normalAssets, VBucket &failedAssets)
551 {
552 int ret = E_OK;
553 if (normalAssets.size() > 1) {
554 ret = storageProxy_->FillCloudAssetForDownload(tableName, normalAssets, true);
555 if (ret != E_OK) {
556 LOGE("[CloudSyncer] Can not fill normal cloud assets for download");
557 return ret;
558 }
559 }
560 if (failedAssets.size() > 1) {
561 ret = storageProxy_->FillCloudAssetForDownload(tableName, failedAssets, false);
562 if (ret != E_OK) {
563 LOGE("[CloudSyncer] Can not fill abnormal assets for download");
564 return ret;
565 }
566 }
567 return E_OK;
568 }
569
HandleDownloadResult(const std::string & tableName,DownloadCommitList & commitList,uint32_t & successCount)570 int CloudSyncer::HandleDownloadResult(const std::string &tableName, DownloadCommitList &commitList,
571 uint32_t &successCount)
572 {
573 successCount = 0;
574 int errCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
575 if (errCode != E_OK) {
576 LOGE("[CloudSyncer] start transaction Failed before handle download.");
577 return errCode;
578 }
579 errCode = storageProxy_->SetLogTriggerStatus(false);
580 if (errCode != E_OK) {
581 return errCode;
582 }
583 for (size_t i = 0; i < commitList.size(); i++) {
584 std::string gid = std::get<0>(commitList[i]); // 0 means gid is the first element in assetsInfo
585 // 1 means assetsMap info [colName, assets] is the forth element in downloadList[i]
586 std::map<std::string, Assets> assetsMap = std::get<1>(commitList[i]);
587 bool setAllNormal = std::get<2>(commitList[i]); // 2 means whether the download return is E_OK
588 VBucket normalAssets;
589 VBucket failedAssets;
590 normalAssets[CloudDbConstant::GID_FIELD] = gid;
591 failedAssets[CloudDbConstant::GID_FIELD] = gid;
592 for (auto &assetKvPair : assetsMap) {
593 Assets &assets = assetKvPair.second;
594 if (setAllNormal) {
595 normalAssets[assetKvPair.first] = std::move(assets);
596 } else {
597 failedAssets[assetKvPair.first] = std::move(assets);
598 }
599 }
600 errCode = FillCloudAssets(tableName, normalAssets, failedAssets);
601 if (errCode != E_OK) {
602 break;
603 }
604 successCount++;
605 }
606 errCode = storageProxy_->SetLogTriggerStatus(true);
607 if (errCode != E_OK) {
608 LOGE("Set log trigger true failed when handle download.%d", errCode);
609 successCount = 0;
610 storageProxy_->Rollback();
611 return errCode;
612 }
613 errCode = storageProxy_->Commit();
614 if (errCode != E_OK) {
615 successCount = 0;
616 }
617 return errCode;
618 }
619
CloudDbDownloadAssets(InnerProcessInfo & info,DownloadList & downloadList,bool willHandleResult,const std::set<Key> & dupHashKeySet,ChangedData & changedAssets)620 int CloudSyncer::CloudDbDownloadAssets(InnerProcessInfo &info, DownloadList &downloadList, bool willHandleResult,
621 const std::set<Key> &dupHashKeySet, ChangedData &changedAssets)
622 {
623 int downloadStatus = E_OK;
624 DownloadCommitList commitList;
625 for (size_t i = 0; i < downloadList.size(); i++) {
626 DownloadItem downloadItem;
627 GetDownloadItem(downloadList, i, downloadItem);
628 std::map<std::string, Assets> downloadAssets(downloadItem.assets);
629 CloudStorageUtils::EraseNoChangeAsset(downloadAssets);
630 if (downloadAssets.empty()) { // Download data (include deleting)
631 continue;
632 }
633 int errorCode = cloudDB_.Download(info.tableName, downloadItem.gid, downloadItem.prefix, downloadAssets);
634 if (errorCode == -E_NOT_SET) {
635 info.downLoadInfo.failCount += (downloadList.size() - i);
636 info.downLoadInfo.successCount -= (downloadList.size() - i);
637 return -E_NOT_SET;
638 }
639 if (dupHashKeySet.find(downloadItem.hashKey) == dupHashKeySet.end()) {
640 changedAssets.primaryData[OpTypeToChangeType(downloadItem.strategy)].push_back(
641 downloadItem.primaryKeyValList);
642 } else if (downloadItem.strategy == OpType::INSERT) {
643 changedAssets.primaryData[ChangeType::OP_UPDATE].push_back(downloadItem.primaryKeyValList);
644 }
645 if (!willHandleResult) {
646 continue;
647 }
648 CloudStorageUtils::MergeDownloadAsset(downloadAssets, downloadItem.assets);
649 // Process result of each asset
650 commitList.push_back(std::make_tuple(downloadItem.gid, std::move(downloadItem.assets), errorCode == E_OK));
651 downloadStatus = downloadStatus == E_OK ? errorCode : downloadStatus;
652 if ((i + 1) % MAX_DOWNLOAD_COMMIT_LIMIT == 0 || i == (commitList.size() - 1)) {
653 int ret = CommitDownloadResult(info, commitList);
654 if (ret != E_OK) {
655 return ret;
656 }
657 }
658 }
659 if (!commitList.empty()) {
660 int ret = CommitDownloadResult(info, commitList);
661 if (ret != E_OK) {
662 return ret;
663 }
664 }
665 LOGD("Download status is %d", downloadStatus);
666 return downloadStatus;
667 }
668
GetDownloadItem(const DownloadList & downloadList,size_t i,DownloadItem & downloadItem)669 void CloudSyncer::GetDownloadItem(const DownloadList &downloadList, size_t i, DownloadItem &downloadItem)
670 {
671 downloadItem.gid = std::get<0>(downloadList[i]); // 0 means gid is the first element in assetsInfo
672 downloadItem.prefix = std::get<1>(downloadList[i]); // 1 means primaryKey is the second element in assetsInfo
673 downloadItem.strategy = std::get<2>(downloadList[i]); // 2 means strategy is the third element in assetsInfo
674 // 3 means assets info [colName, assets] is the forth element in downloadList[i]
675 downloadItem.assets = std::get<3>(downloadList[i]);
676 downloadItem.hashKey = std::get<4>(downloadList[i]); // 4 means hash key
677 downloadItem.primaryKeyValList = std::get<5>(downloadList[i]); // 5 means primary key value list
678 }
679
DownloadAssets(InnerProcessInfo & info,const std::vector<std::string> & pKColNames,const std::set<Key> & dupHashKeySet,ChangedData & changedAssets)680 int CloudSyncer::DownloadAssets(InnerProcessInfo &info, const std::vector<std::string> &pKColNames,
681 const std::set<Key> &dupHashKeySet, ChangedData &changedAssets)
682 {
683 if (!IsDataContainAssets()) {
684 return E_OK;
685 }
686 // update changed data info
687 if (!IsChngDataEmpty(changedAssets)) {
688 // changedData.primaryData should have no member inside
689 return -E_INVALID_ARGS;
690 }
691 changedAssets.tableName = info.tableName;
692 changedAssets.type = ChangedDataType::ASSET;
693 changedAssets.field = pKColNames;
694 // Get AssetDownloadList
695 DownloadList downloadList;
696 DownloadList completeDeletedList;
697 {
698 std::lock_guard<std::mutex> autoLock(contextLock_);
699 downloadList = currentContext_.assetDownloadList.downloadList;
700 completeDeletedList = currentContext_.assetDownloadList.completeDownloadList;
701 }
702 // Download data (include deleting) will handle return Code in this situation
703 int ret = CloudDbDownloadAssets(info, downloadList, true, dupHashKeySet, changedAssets);
704 if (ret != E_OK) {
705 LOGE("[CloudSyncer] Can not download assets or can not handle download result %d", ret);
706 return ret;
707 }
708 // Download data (include deleting), won't handle return Code in this situation
709 ret = CloudDbDownloadAssets(info, completeDeletedList, false, dupHashKeySet, changedAssets);
710 if (ret != E_OK) {
711 LOGE("[CloudSyncer] Can not download assets or can not handle download result for deleted record %d", ret);
712 }
713 return ret;
714 }
715
GetAssetsFromVBucket(VBucket & data)716 std::map<std::string, Assets> CloudSyncer::GetAssetsFromVBucket(VBucket &data)
717 {
718 std::map<std::string, Assets> assets;
719 std::vector<Field> fields;
720 {
721 std::lock_guard<std::mutex> autoLock(contextLock_);
722 fields = currentContext_.assetFields[currentContext_.tableName];
723 }
724 for (const auto &field : fields) {
725 if (data.find(field.colName) != data.end()) {
726 if (field.type == TYPE_INDEX<Asset> && data[field.colName].index() == TYPE_INDEX<Asset>) {
727 assets[field.colName] = { std::get<Asset>(data[field.colName]) };
728 } else if (field.type == TYPE_INDEX<Assets> && data[field.colName].index() == TYPE_INDEX<Assets>) {
729 assets[field.colName] = std::get<Assets>(data[field.colName]);
730 } else {
731 Assets emptyAssets;
732 assets[field.colName] = emptyAssets;
733 }
734 }
735 }
736 return assets;
737 }
738
TagStatus(bool isExist,SyncParam & param,size_t idx,DataInfo & dataInfo,VBucket & localAssetInfo)739 int CloudSyncer::TagStatus(bool isExist, SyncParam ¶m, size_t idx, DataInfo &dataInfo, VBucket &localAssetInfo)
740 {
741 OpType strategyOpResult = OpType::NOT_HANDLE;
742 int errCode = TagStatusByStrategy(isExist, param, dataInfo, strategyOpResult);
743 if (errCode != E_OK) {
744 return errCode;
745 }
746 param.downloadData.opType[idx] = strategyOpResult;
747 if (!IsDataContainAssets()) {
748 return E_OK;
749 }
750 Key hashKey;
751 if (isExist) {
752 hashKey = dataInfo.localInfo.logInfo.hashKey;
753 }
754 return TagDownloadAssets(hashKey, idx, param, dataInfo, localAssetInfo);
755 }
756
TagDownloadAssets(const Key & hashKey,size_t idx,SyncParam & param,DataInfo & dataInfo,VBucket & localAssetInfo)757 int CloudSyncer::TagDownloadAssets(const Key &hashKey, size_t idx, SyncParam ¶m, DataInfo &dataInfo,
758 VBucket &localAssetInfo)
759 {
760 int ret = E_OK;
761 OpType strategy = param.downloadData.opType[idx];
762 switch (strategy) {
763 case OpType::INSERT:
764 case OpType::UPDATE:
765 case OpType::DELETE:
766 ret = HandleTagAssets(hashKey, idx, param, dataInfo, localAssetInfo);
767 break;
768 case OpType::NOT_HANDLE:
769 case OpType::ONLY_UPDATE_GID:
770 case OpType::SET_CLOUD_FORCE_PUSH_FLAG_ZERO: { // means upload need this data
771 // Save the asset info into context
772 std::map<std::string, Assets> assetsMap = GetAssetsFromVBucket(param.downloadData.data[idx]);
773 {
774 std::lock_guard<std::mutex> autoLock(contextLock_);
775 if (currentContext_.assetsInfo.find(param.tableName) == currentContext_.assetsInfo.end()) {
776 currentContext_.assetsInfo[param.tableName] = {};
777 }
778 currentContext_.assetsInfo[param.tableName][dataInfo.cloudLogInfo.cloudGid] = assetsMap;
779 }
780 break;
781 }
782 default:
783 break;
784 }
785 return ret;
786 }
787
HandleTagAssets(const Key & hashKey,size_t idx,SyncParam & param,DataInfo & dataInfo,VBucket & localAssetInfo)788 int CloudSyncer::HandleTagAssets(const Key &hashKey, size_t idx, SyncParam ¶m, DataInfo &dataInfo,
789 VBucket &localAssetInfo)
790 {
791 Type prefix;
792 std::vector<Type> pkVals;
793 OpType strategy = param.downloadData.opType[idx];
794 bool isDelStrategy = (strategy == OpType::DELETE);
795 int ret = GetCloudPkVals(isDelStrategy ? dataInfo.localInfo.primaryKeys : param.downloadData.data[idx],
796 param.pkColNames, dataInfo.localInfo.logInfo.dataKey, pkVals);
797 if (ret != E_OK) {
798 LOGE("[CloudSyncer] HandleTagAssets cannot get primary key value list. %d", ret);
799 return ret;
800 }
801 prefix = param.isSinglePrimaryKey ? pkVals[0] : prefix;
802 if (param.isSinglePrimaryKey && prefix.index() == TYPE_INDEX<Nil>) {
803 LOGE("[CloudSyncer] Invalid primary key type in TagStatus, it's Nil.");
804 return -E_INTERNAL_ERROR;
805 }
806 std::map<std::string, Assets> assetsMap = TagAssetsInSingleRecord(param.downloadData.data[idx], localAssetInfo,
807 false, ret);
808 if (ret != E_OK) {
809 LOGE("[CloudSyncer] TagAssetsInSingleRecord report ERROR");
810 return ret;
811 }
812 if (isDelStrategy) {
813 param.assetsDownloadList.completeDownloadList.push_back(
814 std::make_tuple(dataInfo.cloudLogInfo.cloudGid, prefix, strategy, assetsMap, hashKey, pkVals));
815 } else {
816 if (!param.isSinglePrimaryKey && strategy == OpType::INSERT) {
817 param.withoutRowIdData.assetInsertData.push_back(std::make_tuple(idx,
818 param.assetsDownloadList.downloadList.size()));
819 }
820 param.assetsDownloadList.downloadList.push_back(
821 std::make_tuple(dataInfo.cloudLogInfo.cloudGid, prefix, strategy, assetsMap, hashKey, pkVals));
822 }
823 return ret;
824 }
825
SaveDatum(SyncParam & param,size_t idx,std::vector<std::pair<Key,size_t>> & deletedList)826 int CloudSyncer::SaveDatum(SyncParam ¶m, size_t idx, std::vector<std::pair<Key, size_t>> &deletedList)
827 {
828 int ret = PreHandleData(param.downloadData.data[idx], param.pkColNames);
829 if (ret != E_OK) {
830 LOGE("[CloudSyncer] Invalid download data:%d", ret);
831 return ret;
832 }
833 ModifyCloudDataTime(param.downloadData.data[idx]);
834 DataInfo dataInfo;
835 VBucket localAssetInfo;
836 bool isExist = true;
837 ret = storageProxy_->GetInfoByPrimaryKeyOrGid(param.tableName, param.downloadData.data[idx], dataInfo.localInfo,
838 localAssetInfo);
839 if (ret == -E_NOT_FOUND) {
840 isExist = false;
841 } else if (ret != E_OK) {
842 LOGE("[CloudSyncer] Cannot get info by primary key or gid: %d.", ret);
843 return ret;
844 }
845 // Get cloudLogInfo from cloud data
846 dataInfo.cloudLogInfo = GetCloudLogInfo(param.downloadData.data[idx]);
847 // Tag datum to get opType
848 ret = TagStatus(isExist, param, idx, dataInfo, localAssetInfo);
849 if (ret != E_OK) {
850 LOGE("[CloudSyncer] Cannot tag status: %d.", ret);
851 return ret;
852 }
853 ret = SaveChangedData(param, idx, dataInfo, deletedList);
854 if (ret != E_OK) {
855 LOGE("[CloudSyncer] Cannot save changed data: %d.", ret);
856 }
857 return ret;
858 }
859
ClearWithoutData(SyncParam & param)860 void CloudSyncer::ClearWithoutData(SyncParam ¶m)
861 {
862 param.withoutRowIdData.insertData.clear();
863 param.withoutRowIdData.updateData.clear();
864 param.withoutRowIdData.assetInsertData.clear();
865 }
866
SaveData(SyncParam & param)867 int CloudSyncer::SaveData(SyncParam ¶m)
868 {
869 if (!IsChngDataEmpty(param.changedData)) {
870 LOGE("[CloudSyncer] changedData.primaryData should have no member inside.");
871 return -E_INVALID_ARGS;
872 }
873 // Update download batch Info
874 param.info.downLoadInfo.batchIndex += 1;
875 param.info.downLoadInfo.total += param.downloadData.data.size();
876 int ret = E_OK;
877 AssetDownloadList assetsDownloadList;
878 param.assetsDownloadList = assetsDownloadList;
879 param.deletePrimaryKeySet.clear();
880 param.dupHashKeySet.clear();
881 ClearWithoutData(param);
882 std::vector<std::pair<Key, size_t>> deletedList;
883 for (size_t i = 0; i < param.downloadData.data.size(); i++) {
884 ret = SaveDatum(param, i, deletedList);
885 if (ret != E_OK) {
886 param.info.downLoadInfo.failCount += param.downloadData.data.size();
887 LOGE("[CloudSyncer] Cannot save datum due to error code %d", ret);
888 return ret;
889 }
890 }
891 // Save assetsMap into current context
892 {
893 std::lock_guard<std::mutex> autoLock(contextLock_);
894 currentContext_.assetDownloadList = param.assetsDownloadList;
895 }
896 // save the data to the database by batch, downloadData will return rowid when insert data.
897 ret = storageProxy_->PutCloudSyncData(param.tableName, param.downloadData);
898 if (ret != E_OK) {
899 param.info.downLoadInfo.failCount += param.downloadData.data.size();
900 LOGE("[CloudSyncer] Cannot save the data to database with error code: %d.", ret);
901 return ret;
902 }
903 ret = UpdateChangedData(param, currentContext_.assetDownloadList);
904 if (ret != E_OK) {
905 param.info.downLoadInfo.failCount += param.downloadData.data.size();
906 LOGE("[CloudSyncer] Cannot update changed data: %d.", ret);
907 return ret;
908 }
909 // Update downloadInfo
910 param.info.downLoadInfo.successCount += param.downloadData.data.size();
911 // Get latest cloudWaterMark
912 VBucket &lastData = param.downloadData.data.back();
913 param.cloudWaterMark = std::get<std::string>(lastData[CloudDbConstant::CURSOR_FIELD]);
914 return E_OK;
915 }
916
PreCheck(CloudSyncer::TaskId & taskId,const TableName & tableName)917 int CloudSyncer::PreCheck(CloudSyncer::TaskId &taskId, const TableName &tableName)
918 {
919 // Check Input and Context Validity
920 int ret = CheckTaskIdValid(taskId);
921 if (ret != E_OK) {
922 return ret;
923 }
924 {
925 std::lock_guard<std::mutex> autoLock(queueLock_);
926 if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
927 LOGE("[CloudSyncer] Cloud Task Info does not exist taskId: , %" PRIu64 ".", taskId);
928 return -E_INVALID_ARGS;
929 }
930 }
931 if (currentContext_.strategy == nullptr) {
932 LOGE("[CloudSyncer] Strategy has not been initialized");
933 return -E_INVALID_ARGS;
934 }
935 ret = storageProxy_->CheckSchema(tableName);
936 if (ret != E_OK) {
937 LOGE("[CloudSyncer] A schema error occurred on the table to be synced, %d", ret);
938 return ret;
939 }
940 return E_OK;
941 }
942
NeedNotifyChangedData(const ChangedData & changedData)943 bool CloudSyncer::NeedNotifyChangedData(const ChangedData &changedData)
944 {
945 {
946 std::lock_guard<std::mutex> autoLock(contextLock_);
947 if (IsModeForcePush(currentContext_.currentTaskId)) {
948 return false;
949 }
950 }
951 // when there have no data been changed, it needn't notified
952 if (changedData.primaryData[OP_INSERT].empty() && changedData.primaryData[OP_UPDATE].empty() &&
953 changedData.primaryData[OP_DELETE].empty()) {
954 return false;
955 }
956 return true;
957 }
958
NotifyChangedData(ChangedData && changedData)959 int CloudSyncer::NotifyChangedData(ChangedData &&changedData)
960 {
961 if (!NeedNotifyChangedData(changedData)) {
962 return E_OK;
963 }
964 std::string deviceName;
965 {
966 std::lock_guard<std::mutex> autoLock(contextLock_);
967 std::vector<std::string> devices = currentContext_.notifier->GetDevices();
968 if (devices.empty()) {
969 LOGE("[CloudSyncer] CurrentContext do not contain device info");
970 return -E_CLOUD_ERROR;
971 }
972 // We use first device name as the target of NotifyChangedData
973 deviceName = devices[0];
974 }
975 int ret = storageProxy_->NotifyChangedData(deviceName, std::move(changedData));
976 if (ret != E_OK) {
977 LOGE("[CloudSyncer] Cannot notify changed data while downloading, %d.", ret);
978 }
979 return ret;
980 }
981
NotifyInDownload(CloudSyncer::TaskId taskId,SyncParam & param)982 void CloudSyncer::NotifyInDownload(CloudSyncer::TaskId taskId, SyncParam ¶m)
983 {
984 CloudTaskInfo taskInfo;
985 {
986 std::lock_guard<std::mutex> autoLock(queueLock_);
987 taskInfo = cloudTaskInfos_[taskId];
988 }
989 std::lock_guard<std::mutex> autoLock(contextLock_);
990
991 if (currentContext_.strategy->JudgeUpload()) {
992 currentContext_.notifier->NotifyProcess(taskInfo, param.info);
993 } else {
994 if (param.isLastBatch) {
995 param.info.tableStatus = ProcessStatus::FINISHED;
996 }
997 if (taskInfo.table.back() == param.tableName && param.isLastBatch) {
998 currentContext_.notifier->UpdateProcess(param.info);
999 } else {
1000 currentContext_.notifier->NotifyProcess(taskInfo, param.info);
1001 }
1002 }
1003 }
1004
SaveDataInTransaction(CloudSyncer::TaskId taskId,SyncParam & param)1005 int CloudSyncer::SaveDataInTransaction(CloudSyncer::TaskId taskId, SyncParam ¶m)
1006 {
1007 int ret = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
1008 if (ret != E_OK) {
1009 LOGE("[CloudSyncer] Cannot start a transaction: %d.", ret);
1010 return ret;
1011 }
1012 if (!IsModeForcePush(taskId)) {
1013 param.changedData.tableName = param.info.tableName;
1014 param.changedData.field = param.pkColNames;
1015 param.changedData.type = ChangedDataType::DATA;
1016 }
1017 ret = SaveData(param);
1018 if (ret != E_OK) {
1019 LOGE("[CloudSyncer] cannot save data: %d.", ret);
1020 int rollBackErrorCode = storageProxy_->Rollback();
1021 if (rollBackErrorCode != E_OK) {
1022 LOGE("[CloudSyncer] cannot roll back transaction: %d.", rollBackErrorCode);
1023 } else {
1024 LOGI("[CloudSyncer] Roll back transaction success: %d.", ret);
1025 }
1026 return ret;
1027 }
1028 ret = storageProxy_->Commit();
1029 if (ret != E_OK) {
1030 LOGE("[CloudSyncer] Cannot commit a transaction: %d.", ret);
1031 }
1032 return ret;
1033 }
1034
SaveDataNotifyProcess(CloudSyncer::TaskId taskId,SyncParam & param)1035 int CloudSyncer::SaveDataNotifyProcess(CloudSyncer::TaskId taskId, SyncParam ¶m)
1036 {
1037 ChangedData changedData;
1038 param.changedData = changedData;
1039 param.downloadData.opType.resize(param.downloadData.data.size());
1040 int ret = SaveDataInTransaction(taskId, param);
1041 if (ret != E_OK) {
1042 return ret;
1043 }
1044 // call OnChange to notify changedData object first time (without Assets)
1045 ret = NotifyChangedData(std::move(param.changedData));
1046 if (ret != E_OK) {
1047 LOGE("[CloudSyncer] Cannot notify changed data due to error %d", ret);
1048 return ret;
1049 }
1050 // Begin downloading assets
1051 ChangedData changedAssets;
1052 ret = DownloadAssets(param.info, param.pkColNames, param.dupHashKeySet, changedAssets);
1053 (void)NotifyChangedData(std::move(changedAssets));
1054 if (ret != E_OK) {
1055 LOGE("[CloudSyncer] Cannot notify downloadAssets due to error %d", ret);
1056 return ret;
1057 }
1058 UpdateCloudWaterMark(param);
1059 return E_OK;
1060 }
1061
NotifyInBatchUpload(const UploadParam & uploadParam,const InnerProcessInfo & innerProcessInfo,bool lastBatch)1062 void CloudSyncer::NotifyInBatchUpload(const UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo,
1063 bool lastBatch)
1064 {
1065 CloudTaskInfo taskInfo;
1066 {
1067 std::lock_guard<std::mutex> autoLock(queueLock_);
1068 taskInfo = cloudTaskInfos_[uploadParam.taskId];
1069 }
1070 std::lock_guard<std::mutex> autoLock(contextLock_);
1071 if (uploadParam.lastTable && lastBatch) {
1072 currentContext_.notifier->UpdateProcess(innerProcessInfo);
1073 } else {
1074 currentContext_.notifier->NotifyProcess(taskInfo, innerProcessInfo);
1075 }
1076 }
1077
DoDownload(CloudSyncer::TaskId taskId)1078 int CloudSyncer::DoDownload(CloudSyncer::TaskId taskId)
1079 {
1080 SyncParam param;
1081 int ret = GetCurrentTableName(param.tableName);
1082 if (ret != E_OK) {
1083 LOGE("[CloudSyncer] Invalid table name for syncing: %d", ret);
1084 return ret;
1085 }
1086 param.info.tableName = param.tableName;
1087 std::vector<Field> assetFields;
1088 // only no primary key and composite primary key contains rowid.
1089 ret = storageProxy_->GetPrimaryColNamesWithAssetsFields(param.tableName, param.pkColNames, assetFields);
1090 if (ret != E_OK) {
1091 LOGE("[CloudSyncer] Cannot get primary column names: %d", ret);
1092 return ret;
1093 }
1094 {
1095 std::lock_guard<std::mutex> autoLock(contextLock_);
1096 currentContext_.assetFields[currentContext_.tableName] = assetFields;
1097 }
1098 param.isSinglePrimaryKey = IsSinglePrimaryKey(param.pkColNames);
1099 param.cloudWaterMark = "";
1100 if (!IsModeForcePull(taskId)) {
1101 ret = storageProxy_->GetCloudWaterMark(param.tableName, param.cloudWaterMark);
1102 if (ret != E_OK) {
1103 LOGE("[CloudSyncer] Cannot get cloud water level from cloud meta data: %d.", ret);
1104 return ret;
1105 }
1106 }
1107 return DoDownloadInner(taskId, param);
1108 }
1109
DoDownloadInner(CloudSyncer::TaskId taskId,SyncParam & param)1110 int CloudSyncer::DoDownloadInner(CloudSyncer::TaskId taskId, SyncParam ¶m)
1111 {
1112 // Query data by batch until reaching end and not more data need to be download
1113 int ret = PreCheck(taskId, param.info.tableName);
1114 if (ret != E_OK) {
1115 return ret;
1116 }
1117 bool queryEnd = false;
1118 while (!queryEnd) {
1119 // Get cloud data after cloud water mark
1120 param.info.tableStatus = ProcessStatus::PROCESSING;
1121 DownloadData downloadData;
1122 param.downloadData = downloadData;
1123 param.isLastBatch = false;
1124 ret = QueryCloudData(param.info.tableName, param.cloudWaterMark, param.downloadData);
1125 if (ret == -E_QUERY_END) {
1126 // Won't break here since downloadData may not be null
1127 queryEnd = true;
1128 param.isLastBatch = true;
1129 } else if (ret != E_OK) {
1130 std::lock_guard<std::mutex> autoLock(contextLock_);
1131 param.info.tableStatus = ProcessStatus::FINISHED;
1132 currentContext_.notifier->UpdateProcess(param.info);
1133 return ret;
1134 }
1135 if (param.downloadData.data.empty()) {
1136 if (ret == E_OK) {
1137 LOGD("[CloudSyncer] try to query cloud data use increment water mark");
1138 UpdateCloudWaterMark(param);
1139 continue;
1140 }
1141 NotifyInEmptyDownload(taskId, param.info);
1142 break;
1143 }
1144 // Save data in transaction, update cloud water mark, notify process and changed data
1145 ret = SaveDataNotifyProcess(taskId, param);
1146 if (ret != E_OK) {
1147 std::lock_guard<std::mutex> autoLock(contextLock_);
1148 param.info.tableStatus = ProcessStatus::FINISHED;
1149 currentContext_.notifier->UpdateProcess(param.info);
1150 return ret;
1151 }
1152 (void)NotifyInDownload(taskId, param);
1153 }
1154 return E_OK;
1155 }
1156
NotifyInEmptyDownload(CloudSyncer::TaskId taskId,InnerProcessInfo & info)1157 void CloudSyncer::NotifyInEmptyDownload(CloudSyncer::TaskId taskId, InnerProcessInfo &info)
1158 {
1159 std::lock_guard<std::mutex> autoLock(contextLock_);
1160 if (currentContext_.strategy->JudgeUpload()) {
1161 currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], info);
1162 } else {
1163 info.tableStatus = FINISHED;
1164 if (cloudTaskInfos_[taskId].table.back() == info.tableName) {
1165 currentContext_.notifier->UpdateProcess(info);
1166 } else {
1167 currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], info);
1168 }
1169 }
1170 }
1171
PreCheckUpload(CloudSyncer::TaskId & taskId,const TableName & tableName,Timestamp & localMark)1172 int CloudSyncer::PreCheckUpload(CloudSyncer::TaskId &taskId, const TableName &tableName, Timestamp &localMark)
1173 {
1174 int ret = PreCheck(taskId, tableName);
1175 if (ret != E_OK) {
1176 return ret;
1177 }
1178 {
1179 std::lock_guard<std::mutex> autoLock(queueLock_);
1180 if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1181 LOGE("[CloudSyncer] Cloud Task Info does not exist taskId: %" PRIu64 ".", taskId);
1182 return -E_INVALID_ARGS;
1183 }
1184 if ((cloudTaskInfos_[taskId].mode < SYNC_MODE_CLOUD_MERGE) ||
1185 (cloudTaskInfos_[taskId].mode > SYNC_MODE_CLOUD_FORCE_PUSH)) {
1186 LOGE("[CloudSyncer] Upload failed, invalid sync mode: %d.",
1187 static_cast<int>(cloudTaskInfos_[taskId].mode));
1188 return -E_INVALID_ARGS;
1189 }
1190 }
1191
1192 if (!IsModeForcePush(taskId)) {
1193 ret = storageProxy_->GetLocalWaterMark(tableName, localMark);
1194 if (ret != E_OK) {
1195 LOGE("[CloudSyncer] Failed to get local water mark when upload, %d.", ret);
1196 }
1197 }
1198 return ret;
1199 }
1200
CheckCloudSyncDataEmpty(const CloudSyncData & uploadData)1201 bool CloudSyncer::CheckCloudSyncDataEmpty(const CloudSyncData &uploadData)
1202 {
1203 return uploadData.insData.extend.empty() && uploadData.insData.record.empty() &&
1204 uploadData.updData.extend.empty() && uploadData.updData.record.empty() &&
1205 uploadData.delData.extend.empty() && uploadData.delData.record.empty();
1206 }
1207
DoBatchUpload(CloudSyncData & uploadData,UploadParam & uploadParam,InnerProcessInfo & innerProcessInfo)1208 int CloudSyncer::DoBatchUpload(CloudSyncData &uploadData, UploadParam &uploadParam, InnerProcessInfo &innerProcessInfo)
1209 {
1210 int errCode = E_OK;
1211 Info insertInfo;
1212 Info updateInfo;
1213 Info deleteInfo;
1214
1215 if (!uploadData.delData.record.empty() && !uploadData.delData.extend.empty()) {
1216 errCode = cloudDB_.BatchDelete(uploadData.tableName, uploadData.delData.record,
1217 uploadData.delData.extend, deleteInfo);
1218 if (errCode != E_OK) {
1219 return errCode;
1220 }
1221 innerProcessInfo.upLoadInfo.successCount += deleteInfo.successCount;
1222 }
1223
1224 if (!uploadData.insData.record.empty() && !uploadData.insData.extend.empty()) {
1225 errCode = cloudDB_.BatchInsert(uploadData.tableName, uploadData.insData.record,
1226 uploadData.insData.extend, insertInfo);
1227 if (errCode != E_OK) {
1228 return errCode;
1229 }
1230 // we need to fill back gid after insert data to cloud.
1231 int ret = storageProxy_->FillCloudGidAndAsset(OpType::INSERT, uploadData);
1232 if (ret != E_OK) {
1233 LOGE("[CloudSyncer] Failed to fill back when doing upload insData, %d.", ret);
1234 return ret;
1235 }
1236 innerProcessInfo.upLoadInfo.successCount += insertInfo.successCount;
1237 }
1238
1239 if (!uploadData.updData.record.empty() && !uploadData.updData.extend.empty()) {
1240 errCode = cloudDB_.BatchUpdate(uploadData.tableName, uploadData.updData.record,
1241 uploadData.updData.extend, updateInfo);
1242 if (errCode != E_OK) {
1243 return errCode;
1244 }
1245 errCode = storageProxy_->FillCloudGidAndAsset(OpType::UPDATE, uploadData);
1246 if (errCode != E_OK) {
1247 LOGE("[CloudSyncer] Failed to fill back when doing upload updData, %d.", errCode);
1248 return errCode;
1249 }
1250 innerProcessInfo.upLoadInfo.successCount += updateInfo.successCount;
1251 }
1252 bool lastBatch = innerProcessInfo.upLoadInfo.successCount == innerProcessInfo.upLoadInfo.total;
1253 if (lastBatch) {
1254 innerProcessInfo.tableStatus = ProcessStatus::FINISHED;
1255 }
1256 // After each batch upload successed, call NotifyProcess
1257 NotifyInBatchUpload(uploadParam, innerProcessInfo, lastBatch);
1258
1259 // if batch upload successed, update local water mark
1260 // The cloud water mark cannot be updated here, because the cloud api doesn't return cursor here.
1261 errCode = PutWaterMarkAfterBatchUpload(uploadData.tableName, uploadParam);
1262 if (errCode != E_OK) {
1263 LOGE("[CloudSyncer] Failed to set local water mark when doing upload, %d.", errCode);
1264 }
1265 return errCode;
1266 }
1267
PutWaterMarkAfterBatchUpload(const std::string & tableName,UploadParam & uploadParam)1268 int CloudSyncer::PutWaterMarkAfterBatchUpload(const std::string &tableName, UploadParam &uploadParam)
1269 {
1270 int errCode = E_OK;
1271 // if we use local cover cloud strategy, it won't update local water mark also.
1272 if (IsModeForcePush(uploadParam.taskId)) {
1273 return E_OK;
1274 }
1275 errCode = storageProxy_->PutLocalWaterMark(tableName, uploadParam.localMark);
1276 if (errCode != E_OK) {
1277 LOGE("[CloudSyncer] Cannot set local water mark while Uploading, %d.", errCode);
1278 }
1279 return errCode;
1280 }
1281
DoUpload(CloudSyncer::TaskId taskId,bool lastTable)1282 int CloudSyncer::DoUpload(CloudSyncer::TaskId taskId, bool lastTable)
1283 {
1284 std::string tableName;
1285 int ret = GetCurrentTableName(tableName);
1286 if (ret != E_OK) {
1287 LOGE("[CloudSyncer] Invalid table name for syncing: %d", ret);
1288 return ret;
1289 }
1290
1291 Timestamp localMark = 0u;
1292 ret = PreCheckUpload(taskId, tableName, localMark);
1293 if (ret != E_OK) {
1294 LOGE("[CloudSyncer] Doing upload sync pre check failed, %d.", ret);
1295 return ret;
1296 }
1297
1298 int64_t count = 0;
1299 ret = storageProxy_->GetUploadCount(tableName, localMark, IsModeForcePush(taskId), count);
1300 if (ret != E_OK) {
1301 // GetUploadCount will return E_OK when upload count is zero.
1302 LOGE("[CloudSyncer] Failed to get Upload Data Count, %d.", ret);
1303 return ret;
1304 }
1305 if (count == 0) {
1306 LOGI("[CloudSyncer] There is no need to doing upload, as the upload data count is zero.");
1307 InnerProcessInfo innerProcessInfo;
1308 innerProcessInfo.tableName = tableName;
1309 innerProcessInfo.upLoadInfo.total = 0; // count is zero
1310 innerProcessInfo.tableStatus = ProcessStatus::FINISHED;
1311 {
1312 std::lock_guard<std::mutex> autoLock(contextLock_);
1313 if (lastTable) {
1314 currentContext_.notifier->UpdateProcess(innerProcessInfo);
1315 } else {
1316 currentContext_.notifier->NotifyProcess(cloudTaskInfos_[taskId], innerProcessInfo);
1317 }
1318 }
1319 return E_OK;
1320 }
1321 UploadParam param;
1322 param.count = count;
1323 param.localMark = localMark;
1324 param.lastTable = lastTable;
1325 param.taskId = taskId;
1326 return DoUploadInner(tableName, param);
1327 }
1328
TagUploadAssets(CloudSyncData & uploadData)1329 int CloudSyncer::TagUploadAssets(CloudSyncData &uploadData)
1330 {
1331 int errCode = E_OK;
1332 if (!IsDataContainAssets()) {
1333 return E_OK;
1334 }
1335 std::map<std::string, std::map<std::string, Assets>> cloudAssets;
1336 {
1337 std::lock_guard<std::mutex> autoLock(contextLock_);
1338 cloudAssets = currentContext_.assetsInfo[currentContext_.tableName];
1339 }
1340 // for delete scenario, assets should not appear in the records. Thereby we needn't tag the assests.
1341 // for insert scenario, gid does not exist. Thereby, we needn't compare with cloud asset get in download procedure
1342 for (size_t i = 0; i < uploadData.insData.extend.size(); i++) {
1343 VBucket cloudAsset; // cloudAsset must be empty
1344 (void)TagAssetsInSingleRecord(uploadData.insData.record[i], cloudAsset, true, errCode);
1345 if (errCode != E_OK) {
1346 LOGE("[CloudSyncer] TagAssetsInSingleRecord report ERROR in DELETE/INSERT option");
1347 return errCode;
1348 }
1349 }
1350 // for update scenario, assets shoulb be compared with asset get in download procedure.
1351 for (size_t i = 0; i < uploadData.updData.extend.size(); i++) {
1352 VBucket cloudAsset;
1353 // gid must exist in UPDATE scenario, cause we have re-fill gid during download procedure
1354 // But we need to check for safety
1355 auto gidIter = uploadData.updData.extend[i].find(CloudDbConstant::GID_FIELD);
1356 if (gidIter == uploadData.updData.extend[i].end()) {
1357 LOGE("[CloudSyncer] Datum to be upload must contain gid");
1358 return -E_INVALID_DATA;
1359 }
1360 // update data must contain gid, however, we could only pull data after water mark
1361 // Therefore, we need to check whether we contain the data
1362 std::string &gid = std::get<std::string>(gidIter->second);
1363 if (cloudAssets.find(gid) == cloudAssets.end()) {
1364 // In this case, we directly upload data without compartion and tagging
1365 std::vector<Field> assetFields;
1366 {
1367 std::lock_guard<std::mutex> autoLock(contextLock_);
1368 assetFields = currentContext_.assetFields[currentContext_.tableName];
1369 }
1370 StatusToFlagForAssetsInRecord(assetFields, uploadData.updData.record[i]);
1371 continue;
1372 }
1373 for (const auto &it : cloudAssets[gid]) {
1374 cloudAsset[it.first] = it.second;
1375 }
1376 (void)TagAssetsInSingleRecord(uploadData.updData.record[i], cloudAsset, true, errCode);
1377 if (errCode != E_OK) {
1378 LOGE("[CloudSyncer] TagAssetsInSingleRecord report ERROR in UPDATE option");
1379 return errCode;
1380 }
1381 }
1382 return E_OK;
1383 }
1384
PreProcessBatchUpload(TaskId taskId,const InnerProcessInfo & innerProcessInfo,CloudSyncData & uploadData,Timestamp & localMark)1385 int CloudSyncer::PreProcessBatchUpload(TaskId taskId, const InnerProcessInfo &innerProcessInfo,
1386 CloudSyncData &uploadData, Timestamp &localMark)
1387 {
1388 // Precheck and calculate local water mark which would be updated if batch upload successed.
1389 int ret = CheckTaskIdValid(taskId);
1390 if (ret != E_OK) {
1391 return ret;
1392 }
1393 ret = CheckCloudSyncDataValid(uploadData, innerProcessInfo.tableName, innerProcessInfo.upLoadInfo.total, taskId);
1394 if (ret != E_OK) {
1395 LOGE("[CloudSyncer] Invalid Cloud Sync Data of Upload, %d.", ret);
1396 return ret;
1397 }
1398 ret = TagUploadAssets(uploadData);
1399 if (ret != E_OK) {
1400 LOGE("TagUploadAssets report ERROR, cannot tag uploadAssets");
1401 return ret;
1402 }
1403 // get local water mark to be updated in future.
1404 ret = UpdateExtendTime(uploadData, innerProcessInfo.upLoadInfo.total, taskId, localMark);
1405 if (ret != E_OK) {
1406 LOGE("[CloudSyncer] Failed to get new local water mark in Cloud Sync Data, %d.", ret);
1407 }
1408 return ret;
1409 }
1410
SaveCloudWaterMark(const TableName & tableName)1411 int CloudSyncer::SaveCloudWaterMark(const TableName &tableName)
1412 {
1413 std::string cloudWaterMark;
1414 {
1415 std::lock_guard<std::mutex> autoLock(contextLock_);
1416 if (currentContext_.cloudWaterMarks.find(tableName) == currentContext_.cloudWaterMarks.end()) {
1417 LOGD("[CloudSyncer] Not found water mark just return");
1418 return E_OK;
1419 }
1420 cloudWaterMark = currentContext_.cloudWaterMarks[tableName];
1421 }
1422 int errCode = storageProxy_->SetCloudWaterMark(tableName, cloudWaterMark);
1423 if (errCode != E_OK) {
1424 LOGE("[CloudSyncer] Cannot set cloud water mark while Uploading, %d.", errCode);
1425 }
1426 return errCode;
1427 }
1428
SetUploadDataFlag(const TaskId taskId,CloudSyncData & uploadData)1429 void CloudSyncer::SetUploadDataFlag(const TaskId taskId, CloudSyncData& uploadData)
1430 {
1431 std::lock_guard<std::mutex> autoLock(queueLock_);
1432 uploadData.isCloudForcePushStrategy = (cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PUSH);
1433 }
1434
IsModeForcePush(const TaskId taskId)1435 bool CloudSyncer::IsModeForcePush(const TaskId taskId)
1436 {
1437 std::lock_guard<std::mutex> autoLock(queueLock_);
1438 return cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PUSH;
1439 }
1440
IsModeForcePull(const TaskId taskId)1441 bool CloudSyncer::IsModeForcePull(const TaskId taskId)
1442 {
1443 std::lock_guard<std::mutex> autoLock(queueLock_);
1444 return cloudTaskInfos_[taskId].mode == SYNC_MODE_CLOUD_FORCE_PULL;
1445 }
1446
DoUploadInner(const std::string & tableName,UploadParam & uploadParam)1447 int CloudSyncer::DoUploadInner(const std::string &tableName, UploadParam &uploadParam)
1448 {
1449 ContinueToken continueStmtToken = nullptr;
1450 CloudSyncData uploadData(tableName);
1451 SetUploadDataFlag(uploadParam.taskId, uploadData);
1452
1453 int ret = storageProxy_->GetCloudData(tableName, uploadParam.localMark, continueStmtToken, uploadData);
1454 if ((ret != E_OK) && (ret != -E_UNFINISHED)) {
1455 LOGE("[CloudSyncer] Failed to get cloud data when upload, %d.", ret);
1456 return ret;
1457 }
1458
1459 InnerProcessInfo info;
1460 info.tableName = tableName;
1461 info.tableStatus = ProcessStatus::PROCESSING;
1462 info.upLoadInfo.total = static_cast<uint32_t>(uploadParam.count);
1463 uint32_t batchIndex = 0;
1464 bool getDataUnfinished = false;
1465
1466 while (!CheckCloudSyncDataEmpty(uploadData)) {
1467 getDataUnfinished = (ret == -E_UNFINISHED);
1468 ret = PreProcessBatchUpload(uploadParam.taskId, info, uploadData, uploadParam.localMark);
1469 if (ret != E_OK) {
1470 break;
1471 }
1472 info.upLoadInfo.batchIndex = ++batchIndex;
1473
1474 ret = DoBatchUpload(uploadData, uploadParam, info);
1475 if (ret != E_OK) {
1476 LOGE("[CloudSyncer] Failed to do upload, %d", ret);
1477 info.upLoadInfo.failCount = info.upLoadInfo.total - info.upLoadInfo.successCount;
1478 info.tableStatus = ProcessStatus::FINISHED;
1479 {
1480 std::lock_guard<std::mutex> autoLock(contextLock_);
1481 currentContext_.notifier->UpdateProcess(info);
1482 }
1483 break;
1484 }
1485
1486 uploadData = CloudSyncData(tableName);
1487
1488 if (continueStmtToken == nullptr) {
1489 break;
1490 }
1491 SetUploadDataFlag(uploadParam.taskId, uploadData);
1492
1493 ret = storageProxy_->GetCloudDataNext(continueStmtToken, uploadData);
1494 if ((ret != E_OK) && (ret != -E_UNFINISHED)) {
1495 LOGE("[CloudSyncer] Failed to get cloud data next when doing upload, %d.", ret);
1496 break;
1497 }
1498 }
1499
1500 if (getDataUnfinished) {
1501 storageProxy_->ReleaseContinueToken(continueStmtToken);
1502 }
1503 return ret;
1504 }
1505
PreHandleData(VBucket & datum,const std::vector<std::string> & pkColNames)1506 int CloudSyncer::PreHandleData(VBucket &datum, const std::vector<std::string> &pkColNames)
1507 {
1508 // type index of field in fields
1509 std::vector<std::pair<std::string, int32_t>> fieldAndIndex = {
1510 std::pair<std::string, int32_t>(CloudDbConstant::GID_FIELD, TYPE_INDEX<std::string>),
1511 std::pair<std::string, int32_t>(CloudDbConstant::CREATE_FIELD, TYPE_INDEX<int64_t>),
1512 std::pair<std::string, int32_t>(CloudDbConstant::MODIFY_FIELD, TYPE_INDEX<int64_t>),
1513 std::pair<std::string, int32_t>(CloudDbConstant::DELETE_FIELD, TYPE_INDEX<bool>),
1514 std::pair<std::string, int32_t>(CloudDbConstant::CURSOR_FIELD, TYPE_INDEX<std::string>)
1515 };
1516
1517 for (const auto &fieldIndex : fieldAndIndex) {
1518 if (datum.find(fieldIndex.first) == datum.end()) {
1519 LOGE("[CloudSyncer] Cloud data do not contain expected field: %s.", fieldIndex.first.c_str());
1520 return -E_CLOUD_ERROR;
1521 }
1522 if (datum[fieldIndex.first].index() != static_cast<size_t>(fieldIndex.second)) {
1523 LOGE("[CloudSyncer] Cloud data's field: %s, doesn't has expected type.", fieldIndex.first.c_str());
1524 return -E_CLOUD_ERROR;
1525 }
1526 }
1527
1528 if (std::get<bool>(datum[CloudDbConstant::DELETE_FIELD])) {
1529 RemoveDataExceptExtendInfo(datum, pkColNames);
1530 }
1531 std::lock_guard<std::mutex> autoLock(contextLock_);
1532 if (IsDataContainDuplicateAsset(currentContext_.assetFields[currentContext_.tableName], datum)) {
1533 LOGE("[CloudSyncer] Cloud data contain duplicate asset");
1534 return -E_CLOUD_ERROR;
1535 }
1536 return E_OK;
1537 }
1538
QueryCloudData(const std::string & tableName,std::string & cloudWaterMark,DownloadData & downloadData)1539 int CloudSyncer::QueryCloudData(const std::string &tableName, std::string &cloudWaterMark,
1540 DownloadData &downloadData)
1541 {
1542 VBucket extend = {
1543 {CloudDbConstant::CURSOR_FIELD, cloudWaterMark}
1544 };
1545 int ret = cloudDB_.Query(tableName, extend, downloadData.data);
1546 if (ret == -E_QUERY_END) {
1547 LOGD("[CloudSyncer] Download data from cloud database success and no more data need to be downloaded");
1548 return -E_QUERY_END;
1549 }
1550 if (ret == E_OK && downloadData.data.empty()) {
1551 if (extend[CloudDbConstant::CURSOR_FIELD].index() != TYPE_INDEX<std::string>) {
1552 LOGE("[CloudSyncer] cursor type is not valid=%d", extend[CloudDbConstant::CURSOR_FIELD].index());
1553 return -E_CLOUD_ERROR;
1554 }
1555 cloudWaterMark = std::get<std::string>(extend[CloudDbConstant::CURSOR_FIELD]);
1556 LOGD("[CloudSyncer] Download data is empty, try to use other cursor=%s", cloudWaterMark.c_str());
1557 return ret;
1558 }
1559 if (ret != E_OK) {
1560 LOGE("[CloudSyncer] Download data from cloud database unsuccess %d", ret);
1561 }
1562 return ret;
1563 }
1564
CheckParamValid(const std::vector<DeviceID> & devices,SyncMode mode)1565 int CloudSyncer::CheckParamValid(const std::vector<DeviceID> &devices, SyncMode mode)
1566 {
1567 if (devices.size() != 1) {
1568 LOGE("[CloudSyncer] invalid devices size %zu", devices.size());
1569 return -E_INVALID_ARGS;
1570 }
1571 for (const auto &dev: devices) {
1572 if (dev.empty() || dev.size() > DBConstant::MAX_DEV_LENGTH) {
1573 LOGE("[CloudSyncer] invalid device, size %zu", dev.size());
1574 return -E_INVALID_ARGS;
1575 }
1576 }
1577 if (mode >= SyncMode::SYNC_MODE_PUSH_ONLY && mode < SyncMode::SYNC_MODE_CLOUD_MERGE) {
1578 LOGE("[CloudSyncer] not support mode %d", static_cast<int>(mode));
1579 return -E_NOT_SUPPORT;
1580 }
1581 if (mode < SyncMode::SYNC_MODE_PUSH_ONLY || mode > SyncMode::SYNC_MODE_CLOUD_FORCE_PULL) {
1582 LOGE("[CloudSyncer] invalid mode %d", static_cast<int>(mode));
1583 return -E_INVALID_ARGS;
1584 }
1585 return E_OK;
1586 }
1587
CheckTaskIdValid(TaskId taskId)1588 int CloudSyncer::CheckTaskIdValid(TaskId taskId)
1589 {
1590 if (closed_) {
1591 LOGE("[CloudSyncer] DB is closed.");
1592 return -E_DB_CLOSED;
1593 }
1594 {
1595 std::lock_guard<std::mutex> autoLock(queueLock_);
1596 if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1597 LOGE("[CloudSyncer] not found task.");
1598 return -E_INVALID_ARGS;
1599 }
1600 if (cloudTaskInfos_[taskId].errCode != E_OK) {
1601 return cloudTaskInfos_[taskId].errCode;
1602 }
1603 }
1604 std::lock_guard<std::mutex> autoLock(contextLock_);
1605 return currentContext_.currentTaskId == taskId ? E_OK : -E_INVALID_ARGS;
1606 }
1607
GetCurrentTableName(std::string & tableName)1608 int CloudSyncer::GetCurrentTableName(std::string &tableName)
1609 {
1610 std::lock_guard<std::mutex> autoLock(contextLock_);
1611 if (currentContext_.tableName.empty()) {
1612 return -E_BUSY;
1613 }
1614 tableName = currentContext_.tableName;
1615 return E_OK;
1616 }
1617
TryToAddSyncTask(CloudTaskInfo && taskInfo)1618 int CloudSyncer::TryToAddSyncTask(CloudTaskInfo &&taskInfo)
1619 {
1620 if (closed_) {
1621 LOGW("[CloudSyncer] syncer is closed, should not sync now");
1622 return -E_DB_CLOSED;
1623 }
1624 std::lock_guard<std::mutex> autoLock(queueLock_);
1625 int errCode = CheckQueueSizeWithNoLock();
1626 if (errCode != E_OK) {
1627 return errCode;
1628 }
1629 do {
1630 currentTaskId_++;
1631 } while (currentTaskId_ == 0);
1632 taskInfo.taskId = currentTaskId_;
1633 taskQueue_.push_back(currentTaskId_);
1634 cloudTaskInfos_[currentTaskId_] = taskInfo;
1635 LOGI("[CloudSyncer] Add task ok, taskId %" PRIu64, cloudTaskInfos_[currentTaskId_].taskId);
1636 return E_OK;
1637 }
1638
CheckQueueSizeWithNoLock()1639 int CloudSyncer::CheckQueueSizeWithNoLock()
1640 {
1641 int32_t limit = queuedManualSyncLimit_;
1642 if (taskQueue_.size() >= static_cast<size_t>(limit)) {
1643 LOGW("[CloudSyncer] too much sync task");
1644 return -E_BUSY;
1645 }
1646 return E_OK;
1647 }
1648
PrepareSync(TaskId taskId)1649 int CloudSyncer::PrepareSync(TaskId taskId)
1650 {
1651 std::vector<std::string> tableNames;
1652 std::vector<std::string> devices;
1653 SyncMode mode;
1654 {
1655 std::lock_guard<std::mutex> autoLock(queueLock_);
1656 if (closed_ || cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1657 LOGW("[CloudSyncer] Abort sync because syncer is closed");
1658 return -E_DB_CLOSED;
1659 }
1660 tableNames = cloudTaskInfos_[taskId].table;
1661 mode = cloudTaskInfos_[taskId].mode;
1662 devices = cloudTaskInfos_[taskId].devices;
1663 }
1664 {
1665 // check current task is running or not
1666 std::lock_guard<std::mutex> autoLock(contextLock_);
1667 if (closed_ || currentContext_.currentTaskId != INVALID_TASK_ID) {
1668 LOGW("[CloudSyncer] Abort sync because syncer is closed or another task is running");
1669 return -E_DB_CLOSED;
1670 }
1671 currentContext_.currentTaskId = taskId;
1672 currentContext_.notifier = std::make_shared<ProcessNotifier>(this);
1673 currentContext_.strategy = StrategyFactory::BuildSyncStrategy(mode);
1674 currentContext_.notifier->Init(tableNames, devices);
1675 LOGI("[CloudSyncer] exec taskId %" PRIu64, taskId);
1676 }
1677 std::lock_guard<std::mutex> autoLock(queueLock_);
1678 cloudTaskInfos_[taskId].status = ProcessStatus::PROCESSING;
1679 return E_OK;
1680 }
1681
LockCloud(TaskId taskId)1682 int CloudSyncer::LockCloud(TaskId taskId)
1683 {
1684 int period;
1685 {
1686 auto res = cloudDB_.Lock();
1687 if (res.first != E_OK) {
1688 return res.first;
1689 }
1690 period = static_cast<int>(res.second) / HEARTBEAT_PERIOD;
1691 }
1692 int errCode = StartHeartBeatTimer(period, taskId);
1693 if (errCode != E_OK) {
1694 UnlockCloud();
1695 }
1696 return errCode;
1697 }
1698
UnlockCloud()1699 int CloudSyncer::UnlockCloud()
1700 {
1701 FinishHeartBeatTimer();
1702 int errCode = cloudDB_.UnLock();
1703 WaitAllHeartBeatTaskExit();
1704 return errCode;
1705 }
1706
StartHeartBeatTimer(int period,TaskId taskId)1707 int CloudSyncer::StartHeartBeatTimer(int period, TaskId taskId)
1708 {
1709 if (timerId_ != 0u) {
1710 LOGW("[CloudSyncer] HeartBeat timer has been start!");
1711 return E_OK;
1712 }
1713 TimerId timerId = 0;
1714 int errCode = RuntimeContext::GetInstance()->SetTimer(period, [this, taskId](TimerId timerId) {
1715 HeartBeat(timerId, taskId);
1716 return E_OK;
1717 }, nullptr, timerId);
1718 if (errCode != E_OK) {
1719 LOGE("[CloudSyncer] HeartBeat timer start failed %d", errCode);
1720 return errCode;
1721 }
1722 timerId_ = timerId;
1723 return E_OK;
1724 }
1725
FinishHeartBeatTimer()1726 void CloudSyncer::FinishHeartBeatTimer()
1727 {
1728 if (timerId_ == 0u) {
1729 return;
1730 }
1731 RuntimeContext::GetInstance()->RemoveTimer(timerId_, true);
1732 timerId_ = 0u;
1733 LOGD("[CloudSyncer] Finish heartbeat timer ok");
1734 }
1735
WaitAllHeartBeatTaskExit()1736 void CloudSyncer::WaitAllHeartBeatTaskExit()
1737 {
1738 std::unique_lock<std::mutex> uniqueLock(heartbeatMutex_);
1739 if (heartBeatCount_ <= 0) {
1740 return;
1741 }
1742 LOGD("[CloudSyncer] Begin wait all heartbeat task exit");
1743 heartbeatCv_.wait(uniqueLock, [this]() {
1744 return heartBeatCount_ <= 0;
1745 });
1746 LOGD("[CloudSyncer] End wait all heartbeat task exit");
1747 }
1748
HeartBeat(TimerId timerId,TaskId taskId)1749 void CloudSyncer::HeartBeat(TimerId timerId, TaskId taskId)
1750 {
1751 if (timerId_ != timerId) {
1752 return;
1753 }
1754 {
1755 std::lock_guard<std::mutex> autoLock(heartbeatMutex_);
1756 heartBeatCount_++;
1757 }
1758 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, taskId]() {
1759 if (heartBeatCount_ >= HEARTBEAT_PERIOD) {
1760 // heartbeat block twice should finish task now
1761 SetTaskFailed(taskId, -E_CLOUD_ERROR);
1762 } else {
1763 int ret = cloudDB_.HeartBeat();
1764 if (ret != E_OK) {
1765 HeartBeatFailed(taskId, ret);
1766 } else {
1767 failedHeartBeatCount_ = 0;
1768 }
1769 }
1770 {
1771 std::lock_guard<std::mutex> autoLock(heartbeatMutex_);
1772 heartBeatCount_--;
1773 }
1774 heartbeatCv_.notify_all();
1775 });
1776 if (errCode != E_OK) {
1777 LOGW("[CloudSyncer] schedule heartbeat task failed %d", errCode);
1778 {
1779 std::lock_guard<std::mutex> autoLock(heartbeatMutex_);
1780 heartBeatCount_--;
1781 }
1782 heartbeatCv_.notify_all();
1783 }
1784 }
1785
HeartBeatFailed(TaskId taskId,int errCode)1786 void CloudSyncer::HeartBeatFailed(TaskId taskId, int errCode)
1787 {
1788 failedHeartBeatCount_++;
1789 if (failedHeartBeatCount_ < MAX_HEARTBEAT_FAILED_LIMIT) {
1790 return;
1791 }
1792 LOGW("[CloudSyncer] heartbeat failed too much times!");
1793 FinishHeartBeatTimer();
1794 SetTaskFailed(taskId, errCode);
1795 }
1796
SetTaskFailed(TaskId taskId,int errCode)1797 void CloudSyncer::SetTaskFailed(TaskId taskId, int errCode)
1798 {
1799 std::lock_guard<std::mutex> autoLock(queueLock_);
1800 if (cloudTaskInfos_.find(taskId) == cloudTaskInfos_.end()) {
1801 return;
1802 }
1803 if (cloudTaskInfos_[taskId].errCode != E_OK) {
1804 return;
1805 }
1806 cloudTaskInfos_[taskId].errCode = errCode;
1807 }
1808
CheckCloudSyncDataValid(CloudSyncData uploadData,const std::string & tableName,const int64_t & count,TaskId & taskId)1809 int CloudSyncer::CheckCloudSyncDataValid(CloudSyncData uploadData, const std::string &tableName,
1810 const int64_t &count, TaskId &taskId)
1811 {
1812 size_t insRecordLen = uploadData.insData.record.size();
1813 size_t insExtendLen = uploadData.insData.extend.size();
1814 size_t updRecordLen = uploadData.updData.record.size();
1815 size_t updExtendLen = uploadData.updData.extend.size();
1816 size_t delRecordLen = uploadData.delData.record.size();
1817 size_t delExtendLen = uploadData.delData.extend.size();
1818
1819 bool syncDataValid = (uploadData.tableName == tableName) &&
1820 ((insRecordLen > 0 && insExtendLen > 0 && insRecordLen == insExtendLen) ||
1821 (updRecordLen > 0 && updExtendLen > 0 && updRecordLen == updExtendLen) ||
1822 (delRecordLen > 0 && delExtendLen > 0 && delRecordLen == delExtendLen));
1823 if (!syncDataValid) {
1824 LOGE("[CloudSyncer] upload data is empty but upload count is not zero or upload table name"
1825 " is not the same as table name of sync data.");
1826 return -E_INTERNAL_ERROR;
1827 }
1828 int64_t syncDataCount = static_cast<int64_t>(insRecordLen) + static_cast<int64_t>(updRecordLen) +
1829 static_cast<int64_t>(delRecordLen);
1830 if (syncDataCount > count) {
1831 LOGE("[CloudSyncer] Size of a batch of sync data is greater than upload data size.");
1832 return -E_INTERNAL_ERROR;
1833 }
1834
1835 return E_OK;
1836 }
1837
GetWaterMarkAndUpdateTime(std::vector<VBucket> & extend,Timestamp & waterMark)1838 int CloudSyncer::GetWaterMarkAndUpdateTime(std::vector<VBucket>& extend, Timestamp &waterMark)
1839 {
1840 for (auto &extendData: extend) {
1841 if (extendData.empty() || extendData.find(CloudDbConstant::MODIFY_FIELD) == extendData.end()) {
1842 LOGE("[CloudSyncer] VBucket is empty or MODIFY_FIELD doesn't exist.");
1843 return -E_INTERNAL_ERROR;
1844 }
1845 if (TYPE_INDEX<int64_t> != extendData.at(CloudDbConstant::MODIFY_FIELD).index()) {
1846 LOGE("[CloudSyncer] VBucket's MODIFY_FIELD doestn't fit int64_t.");
1847 return -E_INTERNAL_ERROR;
1848 }
1849 if (extendData.empty() || extendData.find(CloudDbConstant::CREATE_FIELD) == extendData.end()) {
1850 LOGE("[CloudSyncer] VBucket is empty or MODIFY_FIELD doesn't exist.");
1851 return -E_INTERNAL_ERROR;
1852 }
1853 if (TYPE_INDEX<int64_t> != extendData.at(CloudDbConstant::CREATE_FIELD).index()) {
1854 LOGE("[CloudSyncer] VBucket's MODIFY_FIELD doestn't fit int64_t.");
1855 return -E_INTERNAL_ERROR;
1856 }
1857 waterMark = std::max(int64_t(waterMark), std::get<int64_t>(extendData.at(CloudDbConstant::MODIFY_FIELD)));
1858 int64_t modifyTime =
1859 std::get<int64_t>(extendData.at(CloudDbConstant::MODIFY_FIELD)) / CloudDbConstant::TEN_THOUSAND;
1860 int64_t createTime =
1861 std::get<int64_t>(extendData.at(CloudDbConstant::CREATE_FIELD)) / CloudDbConstant::TEN_THOUSAND;
1862 extendData.insert_or_assign(CloudDbConstant::MODIFY_FIELD, modifyTime);
1863 extendData.insert_or_assign(CloudDbConstant::CREATE_FIELD, createTime);
1864 }
1865 return E_OK;
1866 }
1867
1868 // After doing a batch upload, we need to use CloudSyncData's maximum timestamp to update the water mark;
UpdateExtendTime(CloudSyncData & uploadData,const int64_t & count,TaskId taskId,Timestamp & waterMark)1869 int CloudSyncer::UpdateExtendTime(CloudSyncData &uploadData, const int64_t &count,
1870 TaskId taskId, Timestamp &waterMark)
1871 {
1872 int ret = CheckCloudSyncDataValid(uploadData, uploadData.tableName, count, taskId);
1873 if (ret != E_OK) {
1874 LOGE("[CloudSyncer] Invalid Sync Data when get local water mark.");
1875 return ret;
1876 }
1877 if (!uploadData.insData.extend.empty()) {
1878 if (uploadData.insData.record.size() != uploadData.insData.extend.size()) {
1879 LOGE("[CloudSyncer] Inconsistent size of inserted data.");
1880 return -E_INTERNAL_ERROR;
1881 }
1882 ret = GetWaterMarkAndUpdateTime(uploadData.insData.extend, waterMark);
1883 if (ret != E_OK) {
1884 return ret;
1885 }
1886 }
1887
1888 if (!uploadData.updData.extend.empty()) {
1889 if (uploadData.updData.record.size() != uploadData.updData.extend.size()) {
1890 LOGE("[CloudSyncer] Inconsistent size of updated data, %d.", -E_INTERNAL_ERROR);
1891 return -E_INTERNAL_ERROR;
1892 }
1893 ret = GetWaterMarkAndUpdateTime(uploadData.updData.extend, waterMark);
1894 if (ret != E_OK) {
1895 return ret;
1896 }
1897 }
1898
1899 if (!uploadData.delData.extend.empty()) {
1900 if (uploadData.delData.record.size() != uploadData.delData.extend.size()) {
1901 LOGE("[CloudSyncer] Inconsistent size of deleted data, %d.", -E_INTERNAL_ERROR);
1902 return -E_INTERNAL_ERROR;
1903 }
1904 ret = GetWaterMarkAndUpdateTime(uploadData.delData.extend, waterMark);
1905 if (ret != E_OK) {
1906 return ret;
1907 }
1908 }
1909 return E_OK;
1910 }
1911
ClearCloudSyncData(CloudSyncData & uploadData)1912 void CloudSyncer::ClearCloudSyncData(CloudSyncData &uploadData)
1913 {
1914 std::vector<VBucket>().swap(uploadData.insData.record);
1915 std::vector<VBucket>().swap(uploadData.insData.extend);
1916 std::vector<int64_t>().swap(uploadData.insData.rowid);
1917 std::vector<VBucket>().swap(uploadData.updData.record);
1918 std::vector<VBucket>().swap(uploadData.updData.extend);
1919 std::vector<VBucket>().swap(uploadData.delData.record);
1920 std::vector<VBucket>().swap(uploadData.delData.extend);
1921 }
1922
GetCloudSyncTaskCount()1923 int32_t CloudSyncer::GetCloudSyncTaskCount()
1924 {
1925 std::lock_guard<std::mutex> autoLock(queueLock_);
1926 return taskQueue_.size();
1927 }
1928
CleanCloudData(ClearMode mode,const std::vector<std::string> & tableNameList,const RelationalSchemaObject & localSchema)1929 int CloudSyncer::CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList,
1930 const RelationalSchemaObject &localSchema)
1931 {
1932 std::lock_guard<std::mutex> lock(syncMutex_);
1933 int index = 1;
1934 for (const auto &tableName: tableNameList) {
1935 LOGD("[CloudSyncer] Start clean cloud water mark. table index: %d.", index);
1936 int ret = storageProxy_->CleanWaterMark(tableName);
1937 if (ret != E_OK) {
1938 LOGE("[CloudSyncer] failed to put cloud water mark after clean cloud data, %d.", ret);
1939 return ret;
1940 }
1941 index++;
1942 }
1943 int errCode = storageProxy_->StartTransaction(TransactType::IMMEDIATE);
1944 if (errCode != E_OK) {
1945 LOGE("[CloudSyncer] failed to start Transaction before clean cloud data, %d", errCode);
1946 return errCode;
1947 }
1948
1949 std::vector<Asset> assets;
1950 errCode = storageProxy_->CleanCloudData(mode, tableNameList, localSchema, assets);
1951 if (errCode != E_OK) {
1952 LOGE("[CloudSyncer] failed to clean cloud data, %d.", errCode);
1953 storageProxy_->Rollback();
1954 return errCode;
1955 }
1956
1957 if (!assets.empty() && mode == FLAG_AND_DATA) {
1958 errCode = cloudDB_.RemoveLocalAssets(assets);
1959 if (errCode != E_OK) {
1960 LOGE("[Storage Executor] failed to remove local assets, %d.", errCode);
1961 storageProxy_->Rollback();
1962 return errCode;
1963 }
1964 }
1965
1966 storageProxy_->Commit();
1967 return errCode;
1968 }
1969
ModifyCloudDataTime(VBucket & data)1970 void CloudSyncer::ModifyCloudDataTime(VBucket &data)
1971 {
1972 // data already check field modify_field and create_field
1973 int64_t modifyTime = std::get<int64_t>(data[CloudDbConstant::MODIFY_FIELD]) * CloudDbConstant::TEN_THOUSAND;
1974 int64_t createTime = std::get<int64_t>(data[CloudDbConstant::CREATE_FIELD]) * CloudDbConstant::TEN_THOUSAND;
1975 data[CloudDbConstant::MODIFY_FIELD] = modifyTime;
1976 data[CloudDbConstant::CREATE_FIELD] = createTime;
1977 }
1978
UpdateCloudWaterMark(const SyncParam & param)1979 void CloudSyncer::UpdateCloudWaterMark(const SyncParam ¶m)
1980 {
1981 bool isUpdateCloudCursor = true;
1982 {
1983 std::lock_guard<std::mutex> autoLock(queueLock_);
1984 isUpdateCloudCursor = currentContext_.strategy->JudgeUpdateCursor();
1985 }
1986 // use the cursor of the last datum in data set to update cloud water mark
1987 if (isUpdateCloudCursor) {
1988 std::lock_guard<std::mutex> autoLock(contextLock_);
1989 currentContext_.cloudWaterMarks[param.info.tableName] = param.cloudWaterMark;
1990 }
1991 }
1992
CommitDownloadResult(InnerProcessInfo & info,DownloadCommitList & commitList)1993 int CloudSyncer::CommitDownloadResult(InnerProcessInfo &info, DownloadCommitList &commitList)
1994 {
1995 uint32_t successCount = 0;
1996 int ret = HandleDownloadResult(info.tableName, commitList, successCount);
1997 info.downLoadInfo.failCount += (commitList.size() - successCount);
1998 info.downLoadInfo.successCount -= (commitList.size() - successCount);
1999 if (ret != E_OK) {
2000 LOGE("Commit download result failed.%d", ret);
2001 }
2002 commitList.clear();
2003 return ret;
2004 }
2005
GetIdentify() const2006 std::string CloudSyncer::GetIdentify() const
2007 {
2008 return id_;
2009 }
2010
TagStatusByStrategy(bool isExist,SyncParam & param,DataInfo & dataInfo,OpType & strategyOpResult)2011 int CloudSyncer::TagStatusByStrategy(bool isExist, SyncParam ¶m, DataInfo &dataInfo, OpType &strategyOpResult)
2012 {
2013 strategyOpResult = OpType::NOT_HANDLE;
2014 // ignore same record with local generate data
2015 if (dataInfo.localInfo.logInfo.device.empty() &&
2016 !NeedSaveData(dataInfo.localInfo.logInfo, dataInfo.cloudLogInfo)) {
2017 // not handle same data
2018 return E_OK;
2019 }
2020 {
2021 std::lock_guard<std::mutex> autoLock(contextLock_);
2022 if (!currentContext_.strategy) {
2023 LOGE("[CloudSyncer] strategy has not been set when tag status, %d.", -E_INTERNAL_ERROR);
2024 return -E_INTERNAL_ERROR;
2025 }
2026 strategyOpResult = currentContext_.strategy->TagSyncDataStatus(isExist, dataInfo.localInfo.logInfo,
2027 dataInfo.cloudLogInfo, param.deletePrimaryKeySet);
2028 }
2029 return E_OK;
2030 }
2031 } // namespace DistributedDB
2032